Webman高并发PHP协程框架实战指南

Raingad Raingad
·
·
13 阅读

Webman + Workerman:高并发PHP协程框架实战指南

本文深入讲解 Webman 框架的核心架构、协程编程模式以及如何构建高性能 PHP 服务端应用

引言

在 PHP 开发领域,传统 FPM 模式长期受困于并发性能瓶颈。随着 Swoole 和 Workerman 的兴起,PHP 开发者终于可以构建真正的高性能服务。Webman 作为基于 Workerman 的增强型框架,以其简洁的 API 和卓越的性能,成为 PHP 协程开发的首选方案。


一、技术架构解析

1.1 传统 PHP FPM vs Webman

特性

传统 FPM

Webman

进程模型

请求级进程池

常驻内存进程

并发能力

数百

数万

内存占用

每次请求重建

常驻复用

启动开销

极低

长连接支持

优秀

1.2 Webman 核心架构

<?php
// webman 核心架构示意
require __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Webman\Http\Request;
use Webman\Http\Response;

// 创建 HTTP Worker
$http_worker = new Worker('http://0.0.0.0:8787');

// 设置进程数(CPU 核心数 × 2-4)
$http_worker->count = 4;

// 进程启动回调
$http_worker->onWorkerStart = function($worker) {
    echo "Worker {$worker->id} started\n";
    // 初始化数据库连接池
    // 初始化 Redis 连接池
};

// 消息处理回调
$http_worker->onMessage = function(TcpConnection $connection, Request $request) {
    // 路由分发
    $path = $request->path();
    $method = $request->method();
    
    // 业务逻辑处理
    $response = handleRequest($path, $method, $request);
    
    $connection->send($response);
};

// 启动所有 Worker
Worker::runAll();

二、核心功能实现

2.1 RESTful API 控制器

<?php
// app/controller/UserController.php
declare(strict_types=1);

namespace app\controller;

use support\Request;
use support\Response;
use app\model\User as UserModel;
use Webman\Exceptions\ValidateException;

class UserController
{
    /**
     * 获取用户列表
     */
    public function index(Request $request): Response
    {
        $page = (int) $request->get('page', 1);
        $perPage = (int) $request->get('per_page', 20);
        $keyword = $request->get('keyword', '');
        
        $query = UserModel::query();
        
        if ($keyword) {
            $query->where('username', 'like', "%{$keyword}%");
        }
        
        $users = $query->orderBy('id', 'desc')
            ->paginate($perPage, ['*'], 'page', $page);
        
        return json([
            'code' => 0,
            'msg' => 'success',
            'data' => $users
        ]);
    }
    
    /**
     * 获取单个用户
     */
    public function show(Request $request, int $id): Response
    {
        $user = UserModel::find($id);
        
        if (!$user) {
            return json([
                'code' => 404,
                'msg' => '用户不存在'
            ], 404);
        }
        
        return json([
            'code' => 0,
            'msg' => 'success',
            'data' => $user
        ]);
    }
    
    /**
     * 创建用户
     */
    public function store(Request $request): Response
    {
        try {
            $data = $request->post();
            
            // 数据验证
            $this->validate($data, [
                'username' => 'required|alphaNum|min:3|max:20',
                'email' => 'required|email',
                'password' => 'required|min:6'
            ]);
            
            $user = UserModel::create([
                'username' => $data['username'],
                'email' => $data['email'],
                'password' => password_hash($data['password'], PASSWORD_DEFAULT)
            ]);
            
            return json([
                'code' => 0,
                'msg' => '创建成功',
                'data' => $user
            ], 201);
        } catch (ValidateException $e) {
            return json([
                'code' => 422,
                'msg' => $e->getMessage()
            ], 422);
        }
    }
    
    /**
     * 更新用户
     */
    public function update(Request $request, int $id): Response
    {
        $user = UserModel::find($id);
        
        if (!$user) {
            return json([
                'code' => 404,
                'msg' => '用户不存在'
            ], 404);
        }
        
        $data = $request->post();
        
        // 选择性更新
        if (isset($data['nickname'])) {
            $user->nickname = $data['nickname'];
        }
        if (isset($data['avatar'])) {
            $user->avatar = $data['avatar'];
        }
        
        $user->save();
        
        return json([
            'code' => 0,
            'msg' => '更新成功',
            'data' => $user
        ]);
    }
    
    /**
     * 删除用户
     */
    public function destroy(Request $request, int $id): Response
    {
        $user = UserModel::find($id);
        
        if (!$user) {
            return json([
                'code' => 404,
                'msg' => '用户不存在'
            ], 404);
        }
        
        $user->delete();
        
        return json([
            'code' => 0,
            'msg' => '删除成功'
        ]);
    }
}

2.2 中间件系统

<?php
// app/middleware/AuthMiddleware.php
declare(strict_types=1);

namespace app\middleware;

use Webman\MiddlewareInterface;
use Webman\Http\Request;
use Webman\Http\Response;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;

class AuthMiddleware implements MiddlewareInterface
{
    public function process(Request $request, callable $handler): Response
    {
        // 获取 Token
        $token = $request->header('Authorization');
        
        if (!$token) {
            return json([
                'code' => 401,
                'msg' => '未授权访问'
            ], 401);
        }
        
        // 移除 Bearer 前缀
        $token = str_replace('Bearer ', '', $token);
        
        try {
            // 验证 JWT
            $decoded = JWT::decode($token, new Key(config('app.jwt_secret'), 'HS256'));
            
            // 将用户信息注入请求
            $request->userId = $decoded->sub;
            $request->userInfo = (array) $decoded->data;
            
            return $handler($request);
        } catch (\Exception $e) {
            return json([
                'code' => 401,
                'msg' => 'Token 无效或已过期'
            ], 401);
        }
    }
}
<?php
// app/middleware/CorsMiddleware.php
declare(strict_types=1);

namespace app\middleware;

use Webman\MiddlewareInterface;
use Webman\Http\Request;
use Webman\Http\Response;

class CorsMiddleware implements MiddlewareInterface
{
    public function process(Request $request, callable $handler): Response
    {
        if ($request->method() === 'OPTIONS') {
            return new Response(200);
        }
        
        $response = $handler($request);
        
        $response->header([
            'Access-Control-Allow-Origin' => '*',
            'Access-Control-Allow-Methods' => 'GET, POST, PUT, DELETE, OPTIONS',
            'Access-Control-Allow-Headers' => 'Content-Type, Authorization, X-Requested-With',
            'Access-Control-Max-Age' => '86400'
        ]);
        
        return $response;
    }
}

2.3 数据库连接池

<?php
// config/database.php
return [
    'default' => 'mysql',
    
    'connections' => [
        'mysql' => [
            'driver' => 'mysql',
            'host' => '127.0.0.1',
            'port' => 3306,
            'database' => 'webman_app',
            'username' => 'root',
            'password' => '',
            'charset' => 'utf8mb4',
            'collation' => 'utf8mb4_unicode_ci',
            'prefix' => '',
            'pool' => [
                'min_connections' => 1,
                'max_connections' => 10,
                'connect_timeout' => 10,
                'wait_timeout' => 60,
                'heartbeat' => -1,
                'max_idle_time' => 60,
            ]
        ],
    ]
];
<?php
// 使用数据库
use support\Db;
use app\model\User;

class UserService
{
    public function getUserWithPosts(int $userId): ?array
    {
        return Db::table('users')
            ->where('id', $userId)
            ->first();
    }
    
    public function batchGetUsers(array $ids): array
    {
        return Db::table('users')
            ->whereIn('id', $ids)
            ->get()
            ->toArray();
    }
    
    public function transactionExample(): void
    {
        Db::transaction(function () {
            // 创建用户
            $userId = Db::table('users')->insertGetId([
                'username' => 'test',
                'email' => 'test@example.com',
                'created_at' => date('Y-m-d H:i:s')
            ]);
            
            // 创建用户配置
            Db::table('user_settings')->insert([
                'user_id' => $userId,
                'theme' => 'dark',
                'language' => 'zh-CN'
            ]);
        });
    }
}

三、高性能实战技巧

3.1 Redis 缓存与分布式锁

<?php
// config/redis.php
return [
    'default' => [
        'host' => '127.0.0.1',
        'password' => '',
        'port' => 6379,
        'database' => 0,
    ],
    'cache' => [
        'host' => '127.0.0.1',
        'password' => '',
        'port' => 6379,
        'database' => 1,
    ],
    'session' => [
        'host' => '127.0.0.1',
        'password' => '',
        'port' => 6379,
        'database' => 2,
    ]
];
<?php
// 使用 Redis
use support\Redis;

class CacheService
{
    public function getUserCache(int $userId): ?array
    {
        $key = "user:{$userId}";
        $cached = Redis::get($key);
        
        if ($cached) {
            return json_decode($cached, true);
        }
        
        // 缓存未命中,从数据库获取
        $user = \app\model\User::find($userId);
        
        if ($user) {
            $data = $user->toArray();
            // 缓存 1 小时
            Redis::setex($key, 3600, json_encode($data));
            return $data;
        }
        
        return null;
    }
    
    public function invalidateUserCache(int $userId): void
    {
        Redis::del("user:{$userId}");
    }
    
    // 分布式锁
    public function acquireLock(string $key, int $ttl = 10): bool
    {
        return (bool) Redis::set($key, 1, ['NX', 'EX' => $ttl]);
    }
    
    public function releaseLock(string $key): void
    {
        Redis::del($key);
    }
}

3.2 定时任务

<?php
// app/command/CleanupCommand.php
<?php
declare(strict_types=1);

namespace app\command;

use Carbon\Carbon;
use support\Db;
use support\Redis;

class CleanupCommand
{
    public function run(): void
    {
        $this->cleanExpiredTokens();
        $this->cleanOldLogs();
        $this->syncOnlineUsers();
    }
    
    private function cleanExpiredTokens(): void
    {
        $expired = Db::table('password_resets')
            ->where('created_at', '<', Carbon::now()->subHours(24))
            ->delete();
        
        echo "清理了 {$expired} 个过期token\n";
    }
    
    private function cleanOldLogs(): void
    {
        $deleted = Db::table('operation_logs')
            ->where('created_at', '<', Carbon::now()->subDays(30))
            ->delete();
        
        echo "清理了 {$deleted} 条旧日志\n";
    }
    
    private function syncOnlineUsers(): void
    {
        // 清理 Redis 中过期的在线状态
        $pattern = 'online_user_*';
        $keys = Redis::keys($pattern);
        
        foreach ($keys as $key) {
            $ttl = Redis::ttl($key);
            if ($ttl <= 0) {
                Redis::del($key);
            }
        }
    }
}

3.3 WebSocket 实时通讯

<?php
// app/websocket/ChatHandler.php
<?php
declare(strict_types=1);

namespace app\websocket;

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Protocols\Http\Json;

class ChatHandler
{
    /**
     * WebSocket 连接时触发
     */
    public function onConnect($connection): void
    {
        $connection->uid = null;
        echo "新连接: {$connection->id}\n";
        
        // 发送欢迎消息
        $connection->send(Json::encode([
            'type' => 'system',
            'message' => '欢迎连接聊天服务',
            'timestamp' => time()
        ]));
    }
    
    /**
     * 收到消息时触发
     */
    public function onMessage($connection, $data): void
    {
        $data = Json::decode($data);
        
        switch ($data['type'] ?? 'unknown') {
            case 'auth':
                // 用户认证
                $this->handleAuth($connection, $data);
                break;
                
            case 'chat':
                // 聊天消息
                $this->handleChat($connection, $data);
                break;
                
            case 'ping':
                // 心跳
                $connection->send(Json::encode(['type' => 'pong']));
                break;
                
            default:
                echo "未知消息类型: {$data['type']}\n";
        }
    }
    
    /**
     * 处理用户认证
     */
    private function handleAuth($connection, array $data): void
    {
        $token = $data['token'] ?? '';
        
        // 验证 token(实际项目中从缓存/数据库验证)
        if ($this->verifyToken($token)) {
            $connection->uid = $data['user_id'];
            $connection->send(Json::encode([
                'type' => 'auth_success',
                'user_id' => $data['user_id']
            ]));
        } else {
            $connection->send(Json::encode([
                'type' => 'auth_failed',
                'message' => '认证失败'
            ]));
        }
    }
    
    /**
     * 处理聊天消息
     */
    private function handleChat($connection, array $data): void
    {
        if (!$connection->uid) {
            $connection->send(Json::encode([
                'type' => 'error',
                'message' => '请先登录'
            ]));
            return;
        }
        
        $message = [
            'type' => 'message',
            'from' => $connection->uid,
            'content' => $data['content'] ?? '',
            'timestamp' => time()
        ];
        
        // 广播给所有在线用户
        $connections = \Workerman\Worker::getConnections();
        foreach ($connections as $conn) {
            if ($conn->uid) {
                $conn->send(Json::encode($message));
            }
        }
    }
    
    /**
     * 连接关闭时触发
     */
    public function onClose($connection): void
    {
        if ($connection->uid) {
            echo "用户 {$connection->uid} 断开连接\n";
            // 更新用户在线状态
        }
    }
    
    private function verifyToken(string $token): bool
    {
        // 实际项目验证逻辑
        return !empty($token);
    }
}

四、性能优化实践

4.1 进程内存优化

<?php
// 内存优化最佳实践

// 1. 及时释放大变量
$largeData = null; // 显式释放

// 2. 使用生成器处理大数据
function getUsersFromDatabase(): Generator
{
    foreach (Db::table('users')->cursor() as $user) {
        yield $user;
    }
}

// 3. 分批处理
function batchProcess(array $items, int $batchSize = 100): void
{
    $chunks = array_chunk($items, $batchSize);
    
    foreach ($chunks as $chunk) {
        // 处理每批数据
        processBatch($chunk);
        
        // 手动垃圾回收
        gc_collect_cycles();
    }
}

4.2 请求级优化

<?php
// 性能优化中间件
class PerformanceMiddleware implements MiddlewareInterface
{
    public function process(Request $request, callable $handler): Response
    {
        $startTime = microtime(true);
        
        $response = $handler($request);
        
        $executionTime = round((microtime(true) - $startTime) * 1000, 2);
        
        $response->header([
            'X-Response-Time' => "{$executionTime}ms",
            'X-Request-ID' => $request->id ?? ''
        ]);
        
        // 慢请求告警(超过 1 秒)
        if ($executionTime > 1000) {
            log()->warning("慢请求: {$request->path()} 耗时 {$executionTime}ms");
        }
        
        return $response;
    }
}

五、部署与监控

5.1 Nginx 配置

# /etc/nginx/conf.d/webman.conf

upstream webman {
    server 127.0.0.1:8787;
    # 如果多进程运行
    server 127.0.0.1:8788;
    server 127.0.0.1:8789;
    server 127.0.0.1:8790;
}

server {
    listen 80;
    server_name your-domain.com;
    root /var/www/webman/public;
    index index.php;
    
    # 静态资源
    location /static/ {
        expires 30d;
        add_header Cache-Control "public, immutable";
    }
    
    # WebSocket 升级
    location /ws {
        proxy_pass http://webman;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_read_timeout 60s;
    }
    
    # HTTP 代理
    location / {
        proxy_pass http://webman;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

5.2 Systemd 服务

# /etc/systemd/system/webman.service

[Unit]
Description=Webman Service
After=network.target

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/webman
ExecStart=/usr/bin/php /var/www/webman/start.php start -d
ExecReload=/usr/bin/php /var/www/webman/start.php reload
ExecStop=/usr/bin/php /var/www/webman/start.php stop
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target

六、总结

本文详细介绍了:

  1. 架构原理 - Webman 与传统 FPM 的性能对比

  2. 核心功能 - RESTful API、中间件、数据库/Redis 连接池

  3. 实战技巧 - 定时任务、WebSocket 实时通讯

  4. 性能优化 - 内存管理、请求级优化

  5. 部署运维 - Nginx 配置、Systemd 服务


参考文档:

评论

目录