安装使用Workerman实现websocket前后端通信,后端主动推送消息到前端,实现后端有数据更新时,前端页面自动更新数据。
我使用的是基于Thinkphp5.0的ThinkCMF5.0。
安装:
composer require topthink/think-worker=v1.0.1
启动:
php server.php start -d
public目录下放置的server.php文件,注意里面的配置必须按照你的Worker控制器来:
#!/usr/bin/env php<?php// 调试模式开关define("APP_DEBUG", false);// 定义 APP 命名空间define("APP_NAMESPACE", 'api');// 定义CMF根目录,可更改此目录define('CMF_ROOT', __DIR__ . '/../');// 定义应用目录define('APP_PATH', CMF_ROOT . 'api/');// 定义CMF目录define('CMF_PATH', CMF_ROOT . 'simplewind/cmf/');// 定义网站入口目录define('WEB_ROOT', __DIR__ . '/');// 定义插件目录define('PLUGINS_PATH', __DIR__ . '/plugins/');// 定义扩展目录define('EXTEND_PATH', CMF_ROOT . 'simplewind/extend/');define('VENDOR_PATH', CMF_ROOT . 'simplewind/vendor/');// 定义应用的运行时目录define('RUNTIME_PATH',__DIR__.'/../data/runtime/api/');//define('APP_PATH', __DIR__ . '/api/');define('BIND_MODULE','user/Worker');// 加载框架引导文件require __DIR__ . '/../simplewind/thinkphp/start.php';
woker控制器:
namespace api\user\controller;use think\Db;use think\worker\Server;use Workerman\Worker;use Workerman\Lib\Timer;class WorkerController extends Server{ protected $socket = 'websocket://0.0.0.0:2346'; protected $uidConnections = []; protected $HEARTBEAT_TIME = '60'; public function onMessage($connection, $datas) { $connection->lastMessageTime = time(); $data = json_decode($datas); if (empty($data->uid)) { $connection->close(); return; } $uid = 1;//这里的uid根据自己的情况去验证 if (empty($uid)) { $connection->close(); return; } switch ($data->type) { case 'login': // 保存该用户的输送数据 $this->uidConnections[$uid] = $connection; // $connection->send('发送成功'); break; case 'send': // 发送消息 // $this->sendMessageByUid($uid, $datas); break; } } public function onConnect($connection) { $connection->send('链接成功'); } public function onClose($connection) { if(isset($connection->uid)) { // 连接断开时删除映射 unset($this->uidConnections[$connection->uid]); } } public function onError($connection, $code, $msg) { echo "error $code $msg\n"; } public function onWorkerStart($worker) { // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符 $inner_text_worker = new Worker('text://0.0.0.0:2347');// $inner_text_worker->reusePort=true; $inner_text_worker->onMessage = function ($connection, $buffer) { // $data数组格式,里面有uid,表示向那个uid的页面推送数据 $data = json_decode($buffer, true); $uid = $data['uid']; // 通过workerman,向uid的页面推送数据 $ret = $this->sendMessageByUid($uid, $buffer); // 返回推送结果 $connection->send($ret ? 'ok' : 'fail'); }; // ## 执行监听 ## $inner_text_worker->listen(); Timer::add(10, function()use($worker){ $time_now = time(); foreach($worker->connections as $connection) { // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间 if (empty($connection->lastMessageTime)) { $connection->lastMessageTime = $time_now; continue; }// $diff_time = $time_now - $connection->lastMessageTime;// $msg = '距离上次通话已经过去'.$diff_time.'秒';// $connection->send($msg); // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接 if ($time_now - $connection->lastMessageTime > $this->HEARTBEAT_TIME) { $connection->close(); } } }); } // 向所有验证的用户推送数据 public function broadcast($message) { foreach($this->uidConnections as $connection) { $connection->send($message); } } // 针对uid推送数据 public function sendMessageByUid($uid, $message) { if(isset($this->uidConnections[$uid])) { $connection = $this->uidConnections[$uid]; $connection->send($message); return true; } return false; }}
后端主动推送到前端:
$client = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);// 推送的数据,包含uid字段,表示是给这个uid推送$data_sock = array('uid'=>$merchant, 'type'=>'update');// 发送数据,Text协议需要在数据末尾加上换行符fwrite($client, json_encode($data_sock)."\n");// 读取推送结果// echo fread($client, 8192);
前端代码:
socketStart() { var uid = localStorage.getItem('token') var socket = new WebSocket('wss://www.yechai.com/wss') // 打开Socket socket.onopen = (event) => { socket.send( JSON.stringify({ type: 'login', uid: uid, }) ) } socket.onmessage = (event) => { if (event.data.indexOf('update') != -1) this.fetchData() //收到更新命令,前端更新 // console.log('receive', event.data) } // 监听Socket的关闭 socket.onclose = (event) => { console.log('close', event) setTimeout(() => { this.socketStart() }, 50000) } socket.onerror = function (e) { console.log(e) } this.socket = socket }, socketSend() { var uid = localStorage.getItem('token') this.socket.send( JSON.stringify({ type: 'send', uid: uid, }) ) },
Nginx配置:
location /wss { proxy_pass http://1.1.1.1:2346; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_read_timeout 180s; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header REMOTE-HOST $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; client_max_body_size 5000M; }
因为主动推送的关系,进程数设置为1:
\vendor\topthink\think-worker\src\Server.php
protected $processes = 1;
为什么?
例如:客户端1连接进程A,客户端2连接进程B,客户端2无法直接通过进程B给客户端1发送数据,因为客户端1属于进程A不属于进程B,B进程控制不到客户端1(要想两个进程之间通讯需要一些进程间通讯手段,可以使用http://doc3.workerman.net/component/channel.html)。所以所有客户端都只能连接同一个进程才能直接互相通讯,为了避免客户端连到不同进程,count设置为1。