文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Thinkphp5.0 安装使用Workerman实现websocket前后端通信,后端主动推送消息到前端

2023-08-31 21:55

关注

安装使用Workerman实现websocket前后端通信,后端主动推送消息到前端,实现后端有数据更新时,前端页面自动更新数据。
我使用的是基于Thinkphp5.0的ThinkCMF5.0。
安装:

composer require topthink/think-worker=v1.0.1

启动:

php server.php start -d

public目录下放置的server.php文件,注意里面的配置必须按照你的Worker控制器来:

#!/usr/bin/env php<?php// 调试模式开关define("APP_DEBUG", false);// 定义 APP 命名空间define("APP_NAMESPACE", 'api');// 定义CMF根目录,可更改此目录define('CMF_ROOT', __DIR__ . '/../');// 定义应用目录define('APP_PATH', CMF_ROOT . 'api/');// 定义CMF目录define('CMF_PATH', CMF_ROOT . 'simplewind/cmf/');// 定义网站入口目录define('WEB_ROOT', __DIR__ . '/');// 定义插件目录define('PLUGINS_PATH', __DIR__ . '/plugins/');// 定义扩展目录define('EXTEND_PATH', CMF_ROOT . 'simplewind/extend/');define('VENDOR_PATH', CMF_ROOT . 'simplewind/vendor/');// 定义应用的运行时目录define('RUNTIME_PATH',__DIR__.'/../data/runtime/api/');//define('APP_PATH', __DIR__ . '/api/');define('BIND_MODULE','user/Worker');// 加载框架引导文件require __DIR__ . '/../simplewind/thinkphp/start.php';

woker控制器:

namespace api\user\controller;use think\Db;use think\worker\Server;use Workerman\Worker;use Workerman\Lib\Timer;class WorkerController extends Server{    protected $socket = 'websocket://0.0.0.0:2346';    protected $uidConnections = [];    protected $HEARTBEAT_TIME = '60';        public function onMessage($connection, $datas)    {            $connection->lastMessageTime = time();            $data = json_decode($datas);            if (empty($data->uid)) {                $connection->close();                return;            }            $uid = 1;//这里的uid根据自己的情况去验证            if (empty($uid)) {                $connection->close();                return;            }            switch ($data->type) {                case 'login':                    // 保存该用户的输送数据                    $this->uidConnections[$uid] = $connection;                    // $connection->send('发送成功');                    break;                case 'send':                    // 发送消息                    // $this->sendMessageByUid($uid, $datas);                    break;            }    }        public function onConnect($connection)    {         $connection->send('链接成功');    }        public function onClose($connection)    {        if(isset($connection->uid))        {            // 连接断开时删除映射            unset($this->uidConnections[$connection->uid]);        }    }        public function onError($connection, $code, $msg)    {        echo "error $code $msg\n";    }        public function onWorkerStart($worker)    {        // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符        $inner_text_worker = new Worker('text://0.0.0.0:2347');//        $inner_text_worker->reusePort=true;        $inner_text_worker->onMessage = function ($connection, $buffer) {            // $data数组格式,里面有uid,表示向那个uid的页面推送数据            $data = json_decode($buffer, true);            $uid = $data['uid'];            // 通过workerman,向uid的页面推送数据            $ret = $this->sendMessageByUid($uid, $buffer);            // 返回推送结果            $connection->send($ret ? 'ok' : 'fail');        };        // ## 执行监听 ##        $inner_text_worker->listen();        Timer::add(10, function()use($worker){            $time_now = time();            foreach($worker->connections as $connection) {                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间                if (empty($connection->lastMessageTime)) {                    $connection->lastMessageTime = $time_now;                    continue;                }//                $diff_time = $time_now - $connection->lastMessageTime;//                $msg = '距离上次通话已经过去'.$diff_time.'秒';//                $connection->send($msg);                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接                if ($time_now - $connection->lastMessageTime > $this->HEARTBEAT_TIME) {                    $connection->close();                }            }        });    }    // 向所有验证的用户推送数据    public function broadcast($message)    {        foreach($this->uidConnections as $connection)        {            $connection->send($message);        }    }    // 针对uid推送数据    public function sendMessageByUid($uid, $message)    {        if(isset($this->uidConnections[$uid]))        {            $connection = $this->uidConnections[$uid];            $connection->send($message);            return true;        }        return false;    }}

后端主动推送到前端:

$client = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);// 推送的数据,包含uid字段,表示是给这个uid推送$data_sock = array('uid'=>$merchant, 'type'=>'update');// 发送数据,Text协议需要在数据末尾加上换行符fwrite($client, json_encode($data_sock)."\n");// 读取推送结果// echo fread($client, 8192);

前端代码:

socketStart() {        var uid = localStorage.getItem('token')        var socket = new WebSocket('wss://www.yechai.com/wss')        // 打开Socket        socket.onopen = (event) => {          socket.send(            JSON.stringify({              type: 'login',              uid: uid,            })          )        }        socket.onmessage = (event) => {          if (event.data.indexOf('update') != -1) this.fetchData() //收到更新命令,前端更新          // console.log('receive', event.data)        }        // 监听Socket的关闭        socket.onclose = (event) => {          console.log('close', event)          setTimeout(() => {            this.socketStart()          }, 50000)        }        socket.onerror = function (e) {          console.log(e)        }        this.socket = socket      },      socketSend() {        var uid = localStorage.getItem('token')        this.socket.send(          JSON.stringify({            type: 'send',            uid: uid,          })        )      },

Nginx配置:

 location /wss {        proxy_pass http://1.1.1.1:2346;        proxy_http_version 1.1;        proxy_set_header Upgrade $http_upgrade;        proxy_set_header Connection "upgrade";        proxy_read_timeout 180s;        proxy_set_header Host $host;        proxy_set_header X-Real-IP $remote_addr;        proxy_set_header REMOTE-HOST $remote_addr;        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;        client_max_body_size 5000M;    }

因为主动推送的关系,进程数设置为1:
\vendor\topthink\think-worker\src\Server.php

protected $processes = 1;

为什么?
例如:客户端1连接进程A,客户端2连接进程B,客户端2无法直接通过进程B给客户端1发送数据,因为客户端1属于进程A不属于进程B,B进程控制不到客户端1(要想两个进程之间通讯需要一些进程间通讯手段,可以使用http://doc3.workerman.net/component/channel.html)。所以所有客户端都只能连接同一个进程才能直接互相通讯,为了避免客户端连到不同进程,count设置为1。

来源地址:https://blog.csdn.net/jeesr/article/details/129485774

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯