安装workerman
composer require workerman/workerman
生成命令
php artisan make:command Workerman
namespace App\Console\Commands;use Illuminate\Console\Command;use Workerman\Worker;class Workerman extends Command{ protected $signature = 'Workerman {action} {--daemonize}'; protected $description = 'Command description'; public function __construct() { parent::__construct(); } public function handle() { global $argv;//定义全局变量 $arg = $this->argument('action'); $argv[1] = $arg; $argv[2] = $this->option('daemonize') ? '-d' : '';//该参数是以daemon(守护进程)方式启动 global $text_worker; // 创建一个Worker监听2347端口,使用websocket协议通讯 $text_worker = new Worker("websocket://0.0.0.0:2347"); $text_worker->uidConnections = array();//在线用户连接对象 $text_worker->uidInfo = array();//在线用户的用户信息 // 启动1个进程对外提供服务,这里设置多个进程在服务端发送客户端时会有问题 $text_worker->count = 1; //当启动workerman的时候 触发此方法 $text_worker->onWorkerStart =function(){ }; //当浏览器连接的时候触发此函数 $text_worker->onConnect = function($connection){ }; //向用户发送信息的时候触发 //$connection 当前连接的人的信息 $data 发送的数据 $text_worker->onMessage = function($connection,$data){ }; //浏览器断开链接的时候触发 $text_worker->onClose = function($connection){}; Worker::runAll(); }}
修改文件
App\Console\Kernel
这样就可以运行:php artisan Workerman start 命令
protected $commands = [ Commands\Workerman::class, ];
启动命令
# php artisan Workerman startWorkerman[artisan] start in DEBUG mode------------------------------------------- WORKERMAN --------------------------------------------Workerman version:4.1.6 PHP version:8.0.18 Event-Loop:\Workerman\Events\Select-------------------------------------------- WORKERS ---------------------------------------------proto user worker listen processes status tcp root none websocket://0.0.0.0:2347 1 [OK] --------------------------------------------------------------------------------------------------
浏览器之间通信
后端
//$connection 当前连接的人的信息 $data 发送的数据$text_worker->onMessage = function($connection,$data){ $data = json_decode($data, true); if($data['type']=='login'){ $this->create_uid($connection,$data); }};//创建uid方法public function create_uid($connection,$data){ global $text_worker; $connection->uid = $data['uid']; //保存用户的uid $text_worker->uidConnections["{$connection->uid}"] = $connection; //向自己的浏览器返回创建成功的信息 $connection->send("用户:[{$connection->uid}] 创建成功");}
打开两个新的页面、打开控制台选择 Console
// 第一个页面var socket = new WebSocket("ws://127.0.0.1:2347//ws");// 建立连接时触发 建立链接的时候,需要向workerman发送一条指令,告诉他我是谁,使用id或者用户标识作为uid,告诉workerman 例如,当前html 用户id是36socket.onopen = function(event) { console.log('连接开始...'); socket.send('{"uid":36,"type":"login"}');}//workerman发送消息的时候,接收并打印socket.onmessage = function(event) { var msg = event.data; console.log(msg );}//第二个页面var socket = new WebSocket("ws://127.0.0.1:2347//ws");// 建立连接时触发 建立链接的时候,需要向workerman发送一条指令,告诉他我是谁,使用id或者用户标识作为uid,告诉workerman 例如,当前html 用户id是37socket.onopen = function(event) { console.log('连接开始...'); socket.send('{"uid":37,"type":"login"}');}//workerman发送消息的时候,接收并打印socket.onmessage = function(event) { var msg = event.data; console.log(msg );}
向其他用户发送信息
后端
$text_worker->onMessage = function($connection,$data){ $data = json_decode($data, true); if($data['type']=='login'){ $this->create_uid($connection,$data); } if($data['type']=='send_message'){ $this->send_message($connection,$data); }};public function send_message($connection,$data){ global $text_worker; if(isset($data['to_uid'])){ var_dump($data['to_uid']); if(isset($text_worker->uidConnections["{$data['to_uid']}"])){ $to_connection=$text_worker->uidConnections["{$data['to_uid']}"]; $to_connection->send($data['uid'].$data['message']); } }}
页面,在uid为37的worker下
socket.send('{"type":"send_message","to_uid":36,"uid":37,"message":"nihao"}');
服务器向浏览器通信
workeman 监听一个本地发送的端口,在启动的时候
//当启动workerman的时候 触发此方法$text_worker->onWorkerStart =function(){ //监听一个内部端口,用来接收服务器的消息,转发给浏览器 $inner_text_worker = new Worker('text://127.0.0.1:5678'); $inner_text_worker->onMessage = function($connection_admin, $data) { global $text_worker; // $data数组格式,里面有uid,表示向那个uid的页面推送数据 $buffer = json_decode($data, true); var_dump($buffer); $to_uid = $buffer['to_uid']; var_dump($to_uid); // 通过workerman,向uid的页面推送数据 $connection = $text_worker->uidConnections[$to_uid]; $ret = $connection->send($data); // 返回推送结果 $connection_admin->send($ret ? 'ok' : 'fail'); }; $inner_text_worker->listen();};Route::get('/', function () { $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1); // 推送的数据,包含用户,表示是给这个用户推送 $data = array ('uid'=>37,'group'=>'admin', 'message'=>'发送成功啦', 'to_uid'=>36); // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符 fwrite($client, json_encode($data)."\n"); echo fread($client, 8192);});
添加心跳的完整代码
namespace App\Console\Commands;use Illuminate\Console\Command;use Workerman\Worker;use Workerman\Timer;// 心跳间隔55秒define('HEARTBEAT_TIME', 55);class Workerman extends Command{ protected $signature = 'Workerman {action} {--daemonize}'; protected $description = 'Command description'; public function __construct() { parent::__construct(); } public function handle() { global $argv;//定义全局变量 $arg = $this->argument('action'); $argv[1] = $arg; $argv[2] = $this->option('daemonize') ? '-d' : '';//该参数是以daemon(守护进程)方式启动 global $text_worker; // 创建一个Worker监听2345端口,使用websocket协议通讯 $text_worker = new Worker("websocket://0.0.0.0:2347"); $text_worker->uidConnections = array();//在线用户连接对象 $text_worker->uidInfo = array();//在线用户的用户信息 // 启动4个进程对外提供服务 $text_worker->count = 1; //当启动workerman的时候 触发此方法 $text_worker->onWorkerStart =function(){ //监听一个内部端口,用来接收服务器的消息,转发给浏览器 $inner_text_worker = new Worker('text://127.0.0.1:5678'); $inner_text_worker->onMessage = function($connection_admin, $data) { global $text_worker; // $data数组格式,里面有uid,表示向那个uid的页面推送数据 $buffer = json_decode($data, true); //var_dump($buffer); $to_uid = $buffer['to_uid']; //var_dump($to_uid); // 通过workerman,向uid的页面推送数据 if(isset($text_worker->uidConnections[$to_uid])){ $connection = $text_worker->uidConnections[$to_uid]; $ret = $connection->send($data); } else { var_dump($to_uid . ': not define'); $ret = false; } // 返回推送结果 $connection_admin->send($ret ? 'ok' : 'fail'); }; $inner_text_worker->listen(); // 进程启动后设置一个每10秒运行一次的定时器 Timer::add(10, function(){ global $text_worker; $time_now = time(); foreach($text_worker->connections as $connection) { // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间 if (empty($connection->lastMessageTime)) { $connection->lastMessageTime = $time_now; continue; } // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接 if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) { var_dump("delete:" . $connection->uid); unset($text_worker->uidConnections["{$connection->uid}"]); $connection->close(); } } }); }; //当浏览器连接的时候触发此函数 $text_worker->onConnect = function($connection){ }; //向用户发送信息的时候触发 //$connection 当前连接的人的信息 $data 发送的数据 $text_worker->onMessage = function($connection,$data){ // 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间 $connection->lastMessageTime = time(); // 其它业务逻辑... $data = json_decode($data, true); if($data['type']=='login'){ $this->create_uid($connection,$data); } if($data['type']=='send_message'){ $this->send_message($connection,$data); } }; //浏览器断开链接的时候触发 $text_worker->onClose = function($connection){}; Worker::runAll(); } //创建uid方法 public function create_uid($connection,$data){ global $text_worker; $connection->uid = $data['uid']; //保存用户的uid $text_worker->uidConnections["{$connection->uid}"] = $connection; //向自己的浏览器返回创建成功的信息 $connection->send("用户:[{$connection->uid}] 创建成功"); } public function send_message($connection,$data){ global $text_worker; if(isset($data['to_uid'])){ // var_dump($data['to_uid']); if(isset($text_worker->uidConnections["{$data['to_uid']}"])){ $to_connection=$text_worker->uidConnections["{$data['to_uid']}"]; $to_connection->send($data['uid'].$data['message']); } } }}
测试数据库连接
use Workerman\Worker;use Workerman\Connection\TcpConnection;require_once __DIR__ . '/vendor/autoload.php';$worker = new Worker('http://0.0.0.0:2345');$worker->onWorkerStart = function($worker){ // 将db实例存储在全局变量中(也可以存储在某类的静态成员中) global $db; $db = new \Workerman\MySQL\Connection( '127.0.0.1',//host '3306',//port 'test_db',//user '***',//password 'test_db'//db_name );};$worker->onMessage = function(TcpConnection $connection, $data){ // 通过全局变量获得db实例 global $db; // 执行SQL $all_tables = $db->query('show tables'); $connection->send(json_encode($all_tables));};// 运行workerWorker::runAll();
来源地址:https://blog.csdn.net/believer_li/article/details/129179586