Webman高并发PHP协程框架实战指南
13
阅读
Webman + Workerman:高并发PHP协程框架实战指南
本文深入讲解 Webman 框架的核心架构、协程编程模式以及如何构建高性能 PHP 服务端应用
引言
在 PHP 开发领域,传统 FPM 模式长期受困于并发性能瓶颈。随着 Swoole 和 Workerman 的兴起,PHP 开发者终于可以构建真正的高性能服务。Webman 作为基于 Workerman 的增强型框架,以其简洁的 API 和卓越的性能,成为 PHP 协程开发的首选方案。
一、技术架构解析
1.1 传统 PHP FPM vs Webman
1.2 Webman 核心架构
<?php
// webman 核心架构示意
require __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Webman\Http\Request;
use Webman\Http\Response;
// 创建 HTTP Worker
$http_worker = new Worker('http://0.0.0.0:8787');
// 设置进程数(CPU 核心数 × 2-4)
$http_worker->count = 4;
// 进程启动回调
$http_worker->onWorkerStart = function($worker) {
echo "Worker {$worker->id} started\n";
// 初始化数据库连接池
// 初始化 Redis 连接池
};
// 消息处理回调
$http_worker->onMessage = function(TcpConnection $connection, Request $request) {
// 路由分发
$path = $request->path();
$method = $request->method();
// 业务逻辑处理
$response = handleRequest($path, $method, $request);
$connection->send($response);
};
// 启动所有 Worker
Worker::runAll();
二、核心功能实现
2.1 RESTful API 控制器
<?php
// app/controller/UserController.php
declare(strict_types=1);
namespace app\controller;
use support\Request;
use support\Response;
use app\model\User as UserModel;
use Webman\Exceptions\ValidateException;
class UserController
{
/**
* 获取用户列表
*/
public function index(Request $request): Response
{
$page = (int) $request->get('page', 1);
$perPage = (int) $request->get('per_page', 20);
$keyword = $request->get('keyword', '');
$query = UserModel::query();
if ($keyword) {
$query->where('username', 'like', "%{$keyword}%");
}
$users = $query->orderBy('id', 'desc')
->paginate($perPage, ['*'], 'page', $page);
return json([
'code' => 0,
'msg' => 'success',
'data' => $users
]);
}
/**
* 获取单个用户
*/
public function show(Request $request, int $id): Response
{
$user = UserModel::find($id);
if (!$user) {
return json([
'code' => 404,
'msg' => '用户不存在'
], 404);
}
return json([
'code' => 0,
'msg' => 'success',
'data' => $user
]);
}
/**
* 创建用户
*/
public function store(Request $request): Response
{
try {
$data = $request->post();
// 数据验证
$this->validate($data, [
'username' => 'required|alphaNum|min:3|max:20',
'email' => 'required|email',
'password' => 'required|min:6'
]);
$user = UserModel::create([
'username' => $data['username'],
'email' => $data['email'],
'password' => password_hash($data['password'], PASSWORD_DEFAULT)
]);
return json([
'code' => 0,
'msg' => '创建成功',
'data' => $user
], 201);
} catch (ValidateException $e) {
return json([
'code' => 422,
'msg' => $e->getMessage()
], 422);
}
}
/**
* 更新用户
*/
public function update(Request $request, int $id): Response
{
$user = UserModel::find($id);
if (!$user) {
return json([
'code' => 404,
'msg' => '用户不存在'
], 404);
}
$data = $request->post();
// 选择性更新
if (isset($data['nickname'])) {
$user->nickname = $data['nickname'];
}
if (isset($data['avatar'])) {
$user->avatar = $data['avatar'];
}
$user->save();
return json([
'code' => 0,
'msg' => '更新成功',
'data' => $user
]);
}
/**
* 删除用户
*/
public function destroy(Request $request, int $id): Response
{
$user = UserModel::find($id);
if (!$user) {
return json([
'code' => 404,
'msg' => '用户不存在'
], 404);
}
$user->delete();
return json([
'code' => 0,
'msg' => '删除成功'
]);
}
}
2.2 中间件系统
<?php
// app/middleware/AuthMiddleware.php
declare(strict_types=1);
namespace app\middleware;
use Webman\MiddlewareInterface;
use Webman\Http\Request;
use Webman\Http\Response;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;
class AuthMiddleware implements MiddlewareInterface
{
public function process(Request $request, callable $handler): Response
{
// 获取 Token
$token = $request->header('Authorization');
if (!$token) {
return json([
'code' => 401,
'msg' => '未授权访问'
], 401);
}
// 移除 Bearer 前缀
$token = str_replace('Bearer ', '', $token);
try {
// 验证 JWT
$decoded = JWT::decode($token, new Key(config('app.jwt_secret'), 'HS256'));
// 将用户信息注入请求
$request->userId = $decoded->sub;
$request->userInfo = (array) $decoded->data;
return $handler($request);
} catch (\Exception $e) {
return json([
'code' => 401,
'msg' => 'Token 无效或已过期'
], 401);
}
}
}
<?php
// app/middleware/CorsMiddleware.php
declare(strict_types=1);
namespace app\middleware;
use Webman\MiddlewareInterface;
use Webman\Http\Request;
use Webman\Http\Response;
class CorsMiddleware implements MiddlewareInterface
{
public function process(Request $request, callable $handler): Response
{
if ($request->method() === 'OPTIONS') {
return new Response(200);
}
$response = $handler($request);
$response->header([
'Access-Control-Allow-Origin' => '*',
'Access-Control-Allow-Methods' => 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers' => 'Content-Type, Authorization, X-Requested-With',
'Access-Control-Max-Age' => '86400'
]);
return $response;
}
}
2.3 数据库连接池
<?php
// config/database.php
return [
'default' => 'mysql',
'connections' => [
'mysql' => [
'driver' => 'mysql',
'host' => '127.0.0.1',
'port' => 3306,
'database' => 'webman_app',
'username' => 'root',
'password' => '',
'charset' => 'utf8mb4',
'collation' => 'utf8mb4_unicode_ci',
'prefix' => '',
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10,
'wait_timeout' => 60,
'heartbeat' => -1,
'max_idle_time' => 60,
]
],
]
];
<?php
// 使用数据库
use support\Db;
use app\model\User;
class UserService
{
public function getUserWithPosts(int $userId): ?array
{
return Db::table('users')
->where('id', $userId)
->first();
}
public function batchGetUsers(array $ids): array
{
return Db::table('users')
->whereIn('id', $ids)
->get()
->toArray();
}
public function transactionExample(): void
{
Db::transaction(function () {
// 创建用户
$userId = Db::table('users')->insertGetId([
'username' => 'test',
'email' => 'test@example.com',
'created_at' => date('Y-m-d H:i:s')
]);
// 创建用户配置
Db::table('user_settings')->insert([
'user_id' => $userId,
'theme' => 'dark',
'language' => 'zh-CN'
]);
});
}
}
三、高性能实战技巧
3.1 Redis 缓存与分布式锁
<?php
// config/redis.php
return [
'default' => [
'host' => '127.0.0.1',
'password' => '',
'port' => 6379,
'database' => 0,
],
'cache' => [
'host' => '127.0.0.1',
'password' => '',
'port' => 6379,
'database' => 1,
],
'session' => [
'host' => '127.0.0.1',
'password' => '',
'port' => 6379,
'database' => 2,
]
];
<?php
// 使用 Redis
use support\Redis;
class CacheService
{
public function getUserCache(int $userId): ?array
{
$key = "user:{$userId}";
$cached = Redis::get($key);
if ($cached) {
return json_decode($cached, true);
}
// 缓存未命中,从数据库获取
$user = \app\model\User::find($userId);
if ($user) {
$data = $user->toArray();
// 缓存 1 小时
Redis::setex($key, 3600, json_encode($data));
return $data;
}
return null;
}
public function invalidateUserCache(int $userId): void
{
Redis::del("user:{$userId}");
}
// 分布式锁
public function acquireLock(string $key, int $ttl = 10): bool
{
return (bool) Redis::set($key, 1, ['NX', 'EX' => $ttl]);
}
public function releaseLock(string $key): void
{
Redis::del($key);
}
}
3.2 定时任务
<?php
// app/command/CleanupCommand.php
<?php
declare(strict_types=1);
namespace app\command;
use Carbon\Carbon;
use support\Db;
use support\Redis;
class CleanupCommand
{
public function run(): void
{
$this->cleanExpiredTokens();
$this->cleanOldLogs();
$this->syncOnlineUsers();
}
private function cleanExpiredTokens(): void
{
$expired = Db::table('password_resets')
->where('created_at', '<', Carbon::now()->subHours(24))
->delete();
echo "清理了 {$expired} 个过期token\n";
}
private function cleanOldLogs(): void
{
$deleted = Db::table('operation_logs')
->where('created_at', '<', Carbon::now()->subDays(30))
->delete();
echo "清理了 {$deleted} 条旧日志\n";
}
private function syncOnlineUsers(): void
{
// 清理 Redis 中过期的在线状态
$pattern = 'online_user_*';
$keys = Redis::keys($pattern);
foreach ($keys as $key) {
$ttl = Redis::ttl($key);
if ($ttl <= 0) {
Redis::del($key);
}
}
}
}
3.3 WebSocket 实时通讯
<?php
// app/websocket/ChatHandler.php
<?php
declare(strict_types=1);
namespace app\websocket;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Protocols\Http\Json;
class ChatHandler
{
/**
* WebSocket 连接时触发
*/
public function onConnect($connection): void
{
$connection->uid = null;
echo "新连接: {$connection->id}\n";
// 发送欢迎消息
$connection->send(Json::encode([
'type' => 'system',
'message' => '欢迎连接聊天服务',
'timestamp' => time()
]));
}
/**
* 收到消息时触发
*/
public function onMessage($connection, $data): void
{
$data = Json::decode($data);
switch ($data['type'] ?? 'unknown') {
case 'auth':
// 用户认证
$this->handleAuth($connection, $data);
break;
case 'chat':
// 聊天消息
$this->handleChat($connection, $data);
break;
case 'ping':
// 心跳
$connection->send(Json::encode(['type' => 'pong']));
break;
default:
echo "未知消息类型: {$data['type']}\n";
}
}
/**
* 处理用户认证
*/
private function handleAuth($connection, array $data): void
{
$token = $data['token'] ?? '';
// 验证 token(实际项目中从缓存/数据库验证)
if ($this->verifyToken($token)) {
$connection->uid = $data['user_id'];
$connection->send(Json::encode([
'type' => 'auth_success',
'user_id' => $data['user_id']
]));
} else {
$connection->send(Json::encode([
'type' => 'auth_failed',
'message' => '认证失败'
]));
}
}
/**
* 处理聊天消息
*/
private function handleChat($connection, array $data): void
{
if (!$connection->uid) {
$connection->send(Json::encode([
'type' => 'error',
'message' => '请先登录'
]));
return;
}
$message = [
'type' => 'message',
'from' => $connection->uid,
'content' => $data['content'] ?? '',
'timestamp' => time()
];
// 广播给所有在线用户
$connections = \Workerman\Worker::getConnections();
foreach ($connections as $conn) {
if ($conn->uid) {
$conn->send(Json::encode($message));
}
}
}
/**
* 连接关闭时触发
*/
public function onClose($connection): void
{
if ($connection->uid) {
echo "用户 {$connection->uid} 断开连接\n";
// 更新用户在线状态
}
}
private function verifyToken(string $token): bool
{
// 实际项目验证逻辑
return !empty($token);
}
}
四、性能优化实践
4.1 进程内存优化
<?php
// 内存优化最佳实践
// 1. 及时释放大变量
$largeData = null; // 显式释放
// 2. 使用生成器处理大数据
function getUsersFromDatabase(): Generator
{
foreach (Db::table('users')->cursor() as $user) {
yield $user;
}
}
// 3. 分批处理
function batchProcess(array $items, int $batchSize = 100): void
{
$chunks = array_chunk($items, $batchSize);
foreach ($chunks as $chunk) {
// 处理每批数据
processBatch($chunk);
// 手动垃圾回收
gc_collect_cycles();
}
}
4.2 请求级优化
<?php
// 性能优化中间件
class PerformanceMiddleware implements MiddlewareInterface
{
public function process(Request $request, callable $handler): Response
{
$startTime = microtime(true);
$response = $handler($request);
$executionTime = round((microtime(true) - $startTime) * 1000, 2);
$response->header([
'X-Response-Time' => "{$executionTime}ms",
'X-Request-ID' => $request->id ?? ''
]);
// 慢请求告警(超过 1 秒)
if ($executionTime > 1000) {
log()->warning("慢请求: {$request->path()} 耗时 {$executionTime}ms");
}
return $response;
}
}
五、部署与监控
5.1 Nginx 配置
# /etc/nginx/conf.d/webman.conf
upstream webman {
server 127.0.0.1:8787;
# 如果多进程运行
server 127.0.0.1:8788;
server 127.0.0.1:8789;
server 127.0.0.1:8790;
}
server {
listen 80;
server_name your-domain.com;
root /var/www/webman/public;
index index.php;
# 静态资源
location /static/ {
expires 30d;
add_header Cache-Control "public, immutable";
}
# WebSocket 升级
location /ws {
proxy_pass http://webman;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 60s;
}
# HTTP 代理
location / {
proxy_pass http://webman;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
5.2 Systemd 服务
# /etc/systemd/system/webman.service
[Unit]
Description=Webman Service
After=network.target
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/webman
ExecStart=/usr/bin/php /var/www/webman/start.php start -d
ExecReload=/usr/bin/php /var/www/webman/start.php reload
ExecStop=/usr/bin/php /var/www/webman/start.php stop
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
六、总结
本文详细介绍了:
架构原理 - Webman 与传统 FPM 的性能对比
核心功能 - RESTful API、中间件、数据库/Redis 连接池
实战技巧 - 定时任务、WebSocket 实时通讯
性能优化 - 内存管理、请求级优化
部署运维 - Nginx 配置、Systemd 服务
参考文档: