文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Nginx + Docker 手动集群方式运行 EMQ

2023-06-03 17:55

关注

EMQ X 在支持客户的过程中,了解到有客户使用 Nginx 做负载均衡,Docker 容器手动加入集群的方式运行 EMQ 集群,现将主要过程记录下来。

业务需求

  • 使用 Nginx 作为反向代理
  • Nginx 需要提前分配好代理 server 的地址
  • 使用 Docker 容器运行 EMQ
  • EMQ 自动重启
  • EMQ 重启后自动集群

配置

Nginx 配置

$ cat /etc/nginx/tcpstream.conf## tcp LB  and SSL passthrough for backend ##stream {    upstream mqtt_broker {        server 127.0.0.1:21871; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21872; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21873; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21874; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21875; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21881; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21891; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21882; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21892; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21883; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21893; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21884; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21894; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21885; #max_fails=5 fail_timeout=30s;        server 127.0.0.1:21895; #max_fails=5 fail_timeout=30s;    }log_format basic '$proxy_protocol_addr - $remote_addr [$time_local] '                 '$protocol $status $bytes_sent $bytes_received '                 '$session_time "$upstream_addr" '                 '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';    access_log /var/log/nginx/access.log basic;    error_log  /var/log/nginx/error.log;    server {        listen 8884 ssl; # proxy_protocol;        proxy_next_upstream on;        #proxy_bind $remote_addr transparent;        proxy_ssl off;        proxy_pass mqtt_broker;        proxy_protocol on;        #ssl_on;        # adding some extra proxy settings        proxy_timeout 350s;        #proxy_buffer_size 128k;        #ssl_certificate /etc/nginx/certs/solace.pem;        #ssl_certificate_key /etc/nginx/certs/solace.pem;        ssl_certificate /etc/nginx/certs/cert.pem;        ssl_certificate_key /etc/nginx/certs/key.pem;        #ssl_verify_client off;        ssl_protocols TLSv1 TLSv1.1 TLSv1.2;        ssl_ciphers HIGH:!aNULL:!MD5;    }}

Docker 配置

客户自行编译的 Docker image,并非使用 EMQ 提供的官方镜像。

Dockerfile 目录如下:

$ ll /opt/Docker/总用量 28-rw-r--r--  1 alexeyp emq      620 10月 22 17:26 Dockerfilelrwxrwxrwx  1 alexeyp emq       13 10月 24 13:59 emqttd -> emqttd.2.3.11drwxr-xr-x 10 alexeyp emq      110 10月 24 14:27 emqttd.2.3.11-rwxr-xr-x  1 alexeyp emq     3463 10月 26 05:03 StartEmqInstance.sh-rwxr-xr-x  1 alexeyp alexeyp  270 10月 25 10:46 status.sh

Dockerfile:

$ cat DockerfileFROM centos:latestRUN yum -y updateEXPOSE 60000-65000WORKDIR /opt/emqttdADD ./emqttd /opt/emqttdADD ./vsparc.rpm /tmp/vsparc.rpmADD ./StartEmqInstance.sh /opt/emqttd/StartEmqInstance.shRUN yum install -y epel-releaseRUN yum install -y which less sed net-tools telnet gtest /tmp/vsparc.rpmENV TZ Australia/MelbourneCMD bash /opt/emqttd/StartEmqInstance.sh && bash

可以看到 Docker 容器启动后会执行一个 StartEmqInstance.sh 的脚本,查看该脚本:

$ cat StartEmqInstance.sh#!/bin/bashDIR=$(dirname $0)HOSTNAME=$(hostname -s)function adjust_instance(){    local INST=$1    local INST_ROOT=$2    cat $INST_ROOT/etc/emq.conf | \       sed -re "s/^node\.name\s*=.*$/node.name = emq$INST@127.0.0.1/" | \       #sed -re "s/^cluster\.name\s*=.*$/cluster.name = $HOSTNAME/" | \       sed -re "s/^listener\.tcp\.external\s*=.*$/listener.tcp.external = 0.0.0.0:6188$INST/" | \       sed -re "s/^listener\.tcp\.external1\s*=.*$/listener.tcp.external1 = 0.0.0.0:6189$INST/" | \       sed -re "s/^listener\.tcp\.external2\s*=.*$/listener.tcp.external2 = 0.0.0.0:6187$INST/" | \       sed -re "s/^listener\.tcp\.internal\s*=.*$/listener.tcp.internal = 127.0.0.1:6298$INST/" | \       sed -re "s/^listener\.ssl\.external\s*=.*$/listener.ssl.external = 6288$INST/" | \       sed -re "s/^listener\.ws\.external\s*=.*$/listener.ws.external = 6208$INST/" | \       sed -re "s/^listener\.wss\.external\s*=.*$/listener.ws.external = 6308$INST/" | \       sed -re "s/^listener\.api\.mgmt\s*=.*$/listener.api.mgmt = 6408$INST/" | \       sed -re "s/^(##\s)?listener\.tcp\.external\.proxy_protocol\s=.*$/listener.tcp.external.proxy_protocol = on/" | \       sed -re "s/^(##\s)?listener\.tcp\.external1\.proxy_protocol\s=.*$/listener.tcp.external1.proxy_protocol = on/" | \       sed -re "s/^(##\s)?listener\.tcp\.external2\.proxy_protocol\s=.*$/listener.tcp.external2.proxy_protocol = on/" | \       sed -re "s/^(##\s)?listener\.tcp\.external\.proxy_protocol_timeout\s=.*$/listener.tcp.external.proxy_protocol_timeout = 30s/" | \       sed -re "s/^(##\s)?listener\.tcp\.external1\.proxy_protocol_timeout\s=.*$/listener.tcp.external1.proxy_protocol_timeout = 30s/" | \       sed -re "s/^(##\s)?listener\.tcp\.external2\.proxy_protocol_timeout\s=.*$/listener.tcp.external2.proxy_protocol_timeout = 30s/" | \       sed -re "s/^(##\s)?node.dist_listen_min\s*=.*$/node.dist_listen_min = 6000$INST/" | \       sed -re "s/^(##\s)?node.dist_listen_max\s*=.*$/node.dist_listen_max = 6000$INST/" | \       cat - > $INST_ROOT/etc/emq.conf.new    mv $INST_ROOT/etc/emq.conf.new $INST_ROOT/etc/emq.conf}function cluster_instance(){    local INST=$1    for DEST in 1 2 3 4 5; do        if [ $DEST == $INST ]; then            continue;        fi        DEST_NODE="emq$DEST@127.0.0.1"        RESULT=$(/opt/emqttd/bin/emqttd_ctl cluster join $DEST_NODE 2>&1)        echo "$RESULT"        echo "$RESULT" | grep -E 'successfully|already' > /dev/null        RC=$?        [ $RC == 0 ] && break    done}cd "$DIR"if [ "$EMQ_INSTANCE_NUMBER" == "" ]; then    echo "Environment variable EMQ_INSTANCE_NUMBER(1..10) is not set."    echo "eMQ instance name is not configured."    exit 1else    adjust_instance $EMQ_INSTANCE_NUMBER $DIRfifunction run_application(){    local CMD="$1"    local RC=1    while [ $RC != 0 ]; do        $CMD        RC=$?        echo "### Exited: $CMD"        echo "### rc = $RC"        #[ $RC != 0 ] && sleep 3        RC=1    done    echo "### Done: $CMD"}function start_node(){    bin/emqttd start    STARTED=0    while [ $STARTED == 0 ]; do        sleep 1        /opt/emqttd/bin/emqttd_ctl status | grep "is running"        [ $? == 0 ] && break    done    cluster_instance $EMQ_INSTANCE_NUMBER > /tmp/cluster_instance.log}start_nodesleep 5run_application "/usr/local/bin/emqtt-stats-collector" &#waitIDLE_TIME=0while [[ $IDLE_TIME -lt 5 ]]do    IDLE_TIME=$((IDLE_TIME+1))    if [[ ! -z "$( /opt/emqttd/bin/emqttd_ctl status|grep 'is running'|awk '{print $1}')" ]]; then        IDLE_TIME=0    else        echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd not running, waiting for recovery in $((60-IDLE_TIME*5)) seconds"    fi    sleep 5doneecho "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd exit abnormally"exit 1

脚本内容稍多而且有些复杂,需要结合 start.sh 脚本和 etc/emq.conf一起看

$ cat start.sh#!/bin/bashfor INST in 1 2 3 4 5do    docker ps | grep -E "\sinstance_$INST$"    if [ $? != 0 ]; then        #docker run -itd ---ulimit nofile=1048576 -restart=always -v /opt/Docker/emqtt/emq$INST/data/mnesia:/opt/emqttd/data/mnesia  -e EMQ_INSTANCE_NUMBER=$INST --name=instance_$INST --network host emq:test &        docker run -itd --ulimit nofile=1048576 -e EMQ_INSTANCE_NUMBER=$INST --name=instance_$INST --network host emq:latest &    fidonewait

EMQ 配置

etc/emq.conf`的全文就不贴出来了,主要是增加了两个 tcp 监听端口,并且关闭了`listener.tcp.external.tune_buffer$ cat etc/emq.conf......##--------------------------------------------------------------------listener.tcp.external = 0.0.0.0:21881listener.tcp.external.acceptors = 16listener.tcp.external.max_clients = 512000listener.tcp.external.access.1 = allow alllistener.tcp.external.proxy_protocol = onlistener.tcp.external.proxy_protocol_timeout = 30slistener.tcp.external.backlog = 1024listener.tcp.external.send_timeout = 15slistener.tcp.external.send_timeout_close = on## listener.tcp.external.tune_buffer = onlistener.tcp.external.nodelay = truelistener.tcp.external.reuseaddr = true##--------------------------------------------------------------------listener.tcp.external1 = 0.0.0.0:21891listener.tcp.external1.acceptors = 16listener.tcp.external1.max_clients = 512000listener.tcp.external1.access.1 = allow alllistener.tcp.external1.proxy_protocol = onlistener.tcp.external1.proxy_protocol_timeout = 30slistener.tcp.external1.backlog = 1024listener.tcp.external1.send_timeout = 15slistener.tcp.external1.send_timeout_close = on## listener.tcp.external1.tune_buffer = onlistener.tcp.external1.nodelay = truelistener.tcp.external1.reuseaddr = true##--------------------------------------------------------------------listener.tcp.external2 = 0.0.0.0:21871listener.tcp.external2.acceptors = 16listener.tcp.external2.max_clients = 512000listener.tcp.external2.access.1 = allow alllistener.tcp.external2.proxy_protocol = onlistener.tcp.external2.proxy_protocol_timeout = 30slistener.tcp.external2.backlog = 1024listener.tcp.external2.send_timeout = 15slistener.tcp.external2.send_timeout_close = on## listener.tcp.external2.tune_buffer = onlistener.tcp.external2.nodelay = truelistener.tcp.external2.reuseaddr = true......

业务分析

Docker 容器初始化

Docker 容器创建之后,StartEmqInstance.sh执行adjust_instance()etc/emq.conf中监听的端口修改为Nginx 的代理 server

 sed -re "s/^node\.name\s*=.*$/node.name = emq$INST@127.0.0.1/" | \ sed -re "s/^listener\.tcp\.external\s*=.*$/listener.tcp.external = 0.0.0.0:6188$INST/"  sed -re "s/^listener\.tcp\.external1\s*=.*$/listener.tcp.external1 = 0.0.0.0:6189$INST/"  sed -re "s/^listener\.tcp\.external2\s*=.*$/listener.tcp.external2 = 0.0.0.0:6187$INST/"  sed -re "s/^listener\.tcp\.internal\s*=.*$/listener.tcp.internal = 127.0.0.1:6298$INST/"

并通过 join 命令来实现集群功能

function cluster_instance(){    local INST=$1    for DEST in 1 2 3 4 5; do        if [ $DEST == $INST ]; then            continue;        fi        DEST_NODE="emq$DEST@127.0.0.1"        RESULT=$(/opt/emqttd/bin/emqttd_ctl cluster join $DEST_NODE 2>&1)        echo "$RESULT"        echo "$RESULT" | grep -E 'successfully|already' > /dev/null        RC=$?        [ $RC == 0 ] && break    done}

循环检查 EMQ 的状态,当 EMQ 停止了之后退出容器

IDLE_TIME=0while [[ $IDLE_TIME -lt 5 ]]do    IDLE_TIME=$((IDLE_TIME+1))    if [[ ! -z "$( /opt/emqttd/bin/emqttd_ctl status|grep 'is running'|awk '{print $1}')" ]]; then        IDLE_TIME=0    else        echo "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd not running, waiting for recovery in $((60-IDLE_TIME*5)) seconds"    fi    sleep 5doneecho "['$(date -u +"%Y-%m-%dT%H:%M:%SZ")']:emqttd exit abnormally"exit 1

访问

客户端通过 SSL 方式连接 地址,Nginx 将连接以 TCP 方式负载到 EMQ 节点。

PS:关于 Nginx 如何反向代理 tcp 和 ssl 的设置,可以参考 EMQ X 消息服务器 Nginx 反向代理

自动重启和自动集群

容器启动后通过StartEmqInstance.sh脚本查询 EMQ 的状态,当 EMQ 停止时退出容器,配合--restart=always来达到重启容器的目的。

EMQ 将集群信息储存在data/mnesia中,将容器的中的目录映射到宿主机,当容器重启之后会读取宿主机映射的相关目录,实现重启后自动集群。

存在问题

  • Docker 的 host 网络模式使用宿主机的网络,当宿主机有其他业务在执行的时候,容易出现端口冲突

解决方案

  • 修改/proc/sys/net/ipv4/ip_local_port_range指定系统分配的端口为 1024 60000,然后将 EMQ 的业务端口分配为 60000 之后的端口

实践案例

建议使用 kubernetes 来编排 docker 容器:

  • EMQ 可以通过kube-apiserver来实现自动集群的功能。
  • 该客户目前只是在单机部署docker集群,使用 kubernetes 可以轻易实现多个节点之间部署集群。
  • kubernetes 的deployment可以监控emqx pod的状态,实现自动重启、弹性扩容等功能。
  • 每个emqx pod都有独立的虚拟 IP,不会出现端口冲突的问题。
  • kubernetes 的Service可以实现固定 IP 和负载均衡的需求,在 Service 创建的请求中,可以通过设置 spec.clusterIP 字段来指定自己的集群 IP 地址,将 Nginx 的代理 server 设置成clusterIP即可,Service可自行实现负载均衡。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯