php使用mqtt
说明:本文旨在说明php关于mqtt的基本用法:基本发布和订阅、共享订阅。
采用的php采用的框架是easyswoole3.5.1(php+swoole),mqtt软件用的是emqx4.3,采用的php软件库是simps/mqtt 1.4(采用它的原因是,它是第一个支持5.0协议的软件库)。
如果是第一次使用mqtt 建议先看emqx文档。
easyswoole文档:https://www.easyswoole.com/
emqx文档:https://www.emqx.io/docs/zh/v4.3/
simps/mqtt 文档:https://packagist.p2hp.com/packages/simps/mqtt
mqtt基础配置
emqx安装后:基础配置文件在/etc/eqmx/emqx.conf
参数修改说明:
//最大报文尺寸(一次上报的数据量)mqtt.max_packet_size =10MB//飞行窗口大小。飞行窗口用于存储未被应答的 QoS 1 和 QoS 2 消息。,实际是储存未被消费的消息,增大可以提高处理速度zone.external.max_inflight = 320
php客户端配置
//mqtt基础配置 'clientMqtt' => [ 'clientConfig' => [//订阅方配置 'userName' => '', // 用户名(可以不填:订阅主题的时候在填写) 'password' => '', // 密码(可以不填:订阅主题的时候在填写) 'clientId' => '', // 客户端id(可以不填:订阅主题的时候在填写) 'keepAlive' => 50, // 心跳周期 默认0秒,设置成0代表禁用 'protocolName' => 'MQTT', // 协议名,默认为MQTT(3.1.1版本),也可为MQIsdp(3.1版本) 'protocolLevel' => 4, // 协议等级,MQTT3.1.1版本为4,5.0版本为5,MQIsdp为3 'properties' => [ 'session_expiry_interval' => 60, 'receive_maximum' => 65536, 'topic_alias_maximum' => 65536, ], // MQTT5 中所需要的属性(只有5的时候才填写,不是5绝对不能写) 'delay' => 3000, // 重连时的延迟时间 (毫秒) 'maxAttempts' => -1, // 最大重连次数。默认-1,表示不限制 'swooleConfig' => [ 'open_mqtt_protocol' => true, 'package_max_length' => 5 * 1024 * 1024//包大小 ] ], //下面的作为客户端,mqtt发布方使用 'host' => '127.0.0.1', 'port' => 'xxxx',//创建客户端(订阅主题)用的 'appID' => 'XXX', 'appSecret' => 'XXXX', ],
php创建主题
namespace xxx\xxxx;use EasySwoole\Component\Process\AbstractProcess;use EasySwoole\EasySwoole\Config;use Simps\MQTT\Client;use Simps\MQTT\Config\ClientConfig;use Simps\MQTT\Hex\ReasonCode;use Simps\MQTT\Protocol\Types;class Test2Subscribe extends AbstractProcess{ public function run($arg) { go(function () {//采用协程方案 //获取mqtt基础配置 $clientConfig ='';//获取上面的配置参数 $clientConfig['clientId'] = 'user2';//定义客户端 $clientConfig['userName'] = 'user2';//定义用户名 //定义配置参数 $configObj = new ClientConfig($clientConfig); //客户端链接 $client = new Client($getConf['host'], $getConf['port'], $configObj); $client->connect(); //创建主题 // No Local:是否想自己转发自己发布的消息:true为不转发、 //Retain As Published:服务端向客户端转发消息时是否要保留其中的 Retain 标识 //Retain Handling 订阅建立时服务端是否向客户端发送保留消息 //No Local 、Retain As Published、Retain Handling 只支持mqtt5协议 if ($clientConfig['protocolLevel'] == 5) {//mqtt5协议 $topics['$queue/test1'] = [ 'qos' => 1, 'no_local' => true, 'retain_as_published' => true, 'retain_handling' => 2, ]; } else {//mqtt3.1.1或3协议 //主题名:qos $topics['$queue/test1'] = 1; } //主题说明:一个主题可以被多个用户订阅,但是他们的用户id不能相同 //$share/g2/topic 为共享订阅的标记,用于平台方的负载均衡,EMQX 会向带topic的两个群组 g1 和 g2 同时发送 msg1 但是各组中只会有一个接收到信息 //$queue/topic 为特殊的共享订阅(不带群组的共享订阅),多个用户订阅,但只会有一个用户收到消息 // //订阅主题 $res = $client->subscribe($topics); //时间 $timeSincePing = time(); while (true) { try { $buffer = $client->recv(); if ($buffer && $buffer !== true) { //处理消息 if (isset($buffer['message'])) { } // QoS1 PUBACK if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {$client->send( [ 'type' => Types::PUBACK, 'message_id' => $buffer['message_id'], ], false); } if ($buffer['type'] === Types::DISCONNECT) {echo sprintf( "Broker is disconnected, The reason is %s [%d]\n", ReasonCode::getReasonPhrase($buffer['code']), $buffer['code']);$client->close($buffer['code']);break; } } //心跳 if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) { $buffer = $client->ping(); if ($buffer) {//echo 'send ping success' . PHP_EOL;$timeSincePing = time(); } } } catch (\Throwable $e) { throw $e; } } }); }}
来源地址:https://blog.csdn.net/qq_34892981/article/details/128634909