文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

聊聊分布式下的WebSocket解决方案

2024-12-10 16:50

关注

发布消息者在系统中发送消息,实时的把消息推送给对应的一个部门下的所有人。

这里面如果是单机应用的情况时,我们可以通过部门的id和用户的id组成一个唯一的key,与应用服务器建立WebSocket长连接,然后就可以接收到发布消息者发送的消息了。

但是真正把项目应用于生产环境中时,我们是不可能就部署一个单机应用的,而是要部署一个集群。

[[343393]]

所以我通过Nginx+两台Tomcat搭建了一个简单的负载均衡集群,作为测试使用

但是问题出现了,我们的客户端浏览器只会与一台服务器建立WebSocket长连接,所以发布消息者在发送消息时,就没法保证所有目标部门的人都能接收到消息(因为这些人连接的可能不是一个服务器)。

本篇文章就是针对于这么一个问题展开讨论,提出一种解决方案,当然解决方案不止一种,那我们开始吧。

WebSocket单体应用介绍

在介绍分布式集群之前,我们先来看一下王子的WebSocket代码实现,先来看java后端代码如下:

  1. import javax.websocket.*; 
  2. import javax.websocket.server.PathParam; 
  3. import javax.websocket.server.ServerEndpoint; 
  4. import com.alibaba.fastjson.JSON; 
  5. import com.alibaba.fastjson.JSONObject;import java.io.IOException; 
  6. import java.util.Map; 
  7. import java.util.concurrent.ConcurrentHashMap; 
  8. @ServerEndpoint("/webSocket/{key}"
  9. public class WebSocket { 
  10.     private static int onlineCount = 0; 
  11.      
  12.     private static Map clients = new ConcurrentHashMap(); 
  13.     private Session session; 
  14.      
  15.     private String key
  16.     @OnOpen 
  17.     public void onOpen(@PathParam("key") String key, Session session) throws IOException { 
  18.         this.key = key
  19.         this.session = session; 
  20.         if (!clients.containsKey(key)) { 
  21.             addOnlineCount();        }        clients.put(key, this); 
  22.         Log.info(key+"已连接消息服务!"); 
  23.     }    @OnClose 
  24.     public void onClose() throws IOException { 
  25.         clients.remove(key);        subOnlineCount();    }    @OnMessage 
  26.     public void onMessage(String message) throws IOException { 
  27.         if(message.equals("ping")){ 
  28.             return ; 
  29.         }        JSONObject jsonTo = JSON.parseObject(message);        String mes = (String) jsonTo.get("message"); 
  30.         if (!jsonTo.get("to").equals("All")){ 
  31.             sendMessageTo(mes, jsonTo.get("to").toString()); 
  32.         }else
  33.             sendMessageAll(mes);        }    }    @OnError 
  34.     public void onError(Session session, Throwable error) { 
  35.         error.printStackTrace();    }    private void sendMessageTo(String message, String To) throws IOException { 
  36.         for (WebSocket item : clients.values()) { 
  37.             if (item.key.contains(To) ) 
  38.                 item.session.getAsyncRemote().sendText(message);        }    }    private void sendMessageAll(String message) throws IOException { 
  39.         for (WebSocket item : clients.values()) { 
  40.             item.session.getAsyncRemote().sendText(message);        }    }    public static synchronized int getOnlineCount() { 
  41.         return onlineCount; 
  42.     }    public static synchronized void addOnlineCount() { 
  43.         WebSocket.onlineCount++;    }    public static synchronized void subOnlineCount() { 
  44.         WebSocket.onlineCount--;    }    public static synchronized Map getClients() { 
  45.         return clients; 
  46.     }} 

示例代码中并没有使用Spring,用的是原生的java web编写的,简单和大家介绍一下里面的方法。

可以看到,在onMessage方法中,我们直接根据客户端发送的消息,进行消息的转发功能,这样在单体消息服务中是没有问题的。

再来看一下js代码

  1. var host = document.location.host; 
  2.     // 获得当前登录科室    var deptCodes='${sessionScope.$UserContext.departmentID}'
  3.     deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, ""); 
  4.     var key = '${sessionScope.$UserContext.userID}'+deptCodes; 
  5.     var lockReconnect = false;  //避免ws重复连接 
  6.     var ws = null;          // 判断当前浏览器是否支持WebSocket    var wsUrl = 'ws://' + host + '/webSocket/'key
  7.     createWebSocket(wsUrl);   //连接ws    function createWebSocket(url) { 
  8.         try{            if('WebSocket' in window){ 
  9.                 ws = new WebSocket(url);            }else if('MozWebSocket' in window){   
  10.                 ws = new MozWebSocket(url);            }else
  11.                   layer.alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!");  
  12.             }            initEventHandle();        }catch(e){            reconnect(url);            console.log(e); 
  13.         }         }    function initEventHandle() { 
  14.         ws.onclose = function () { 
  15.             reconnect(wsUrl);            console.log("llws连接关闭!"+new Date().toUTCString()); 
  16.         };        ws.onerror = function () { 
  17.             reconnect(wsUrl);            console.log("llws连接错误!"); 
  18.         };        ws.onopen = function () { 
  19.             heartCheck.reset().start();      //心跳检测重置            console.log("llws连接成功!"+new Date().toUTCString()); 
  20.         };        ws.onmessage = function (event) {    //如果获取到消息,心跳检测重置 
  21.             heartCheck.reset().start();      //拿到任何消息都说明当前连接是正常的//接收到消息实际业务处理        ...        };    }    // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。    window.onbeforeunload = function() { 
  22.         ws.close(); 
  23.     }      function reconnect(url) { 
  24.         if(lockReconnect) return
  25.         lockReconnect = true
  26.         setTimeout(function () {     //没连接上会一直重连,设置延迟避免请求过多 
  27.             createWebSocket(url);            lockReconnect = false
  28.         }, 2000); 
  29.     }    //心跳检测    var heartCheck = {        timeout: 300000,        //5分钟发一次心跳 
  30.         timeoutObj: null,        serverTimeoutObj: null,        reset: function(){ 
  31.             clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this; 
  32.         },        start: function(){ 
  33.             var self = this;            this.timeoutObj = setTimeout(function(){ 
  34.                 //这里发送一个心跳,后端收到后,返回一个心跳消息,                //onmessage拿到返回的心跳就说明连接正常                ws.send("ping"); 
  35.                 console.log("ping!"
  36.                 self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了 
  37.                     ws.close();     //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次 
  38.                 }, self.timeout)            }, this.timeout)        }  } 

js部分使用的是原生H5编写的,如果为了更好的兼容浏览器,也可以使用SockJS,有兴趣小伙伴们可以自行百度。

接下来我们就手动的优化代码,实现WebSocket对分布式架构的支持。

解决方案的思考

现在我们已经了解单体应用下的代码结构,也清楚了WebSocket在分布式环境下面临的问题,那么是时候思考一下如何能够解决这个问题了。

我们先来看一看发生这个问题的根本原因是什么。

简单思考一下就能明白,单体应用下只有一台服务器,所有的客户端连接的都是这一台消息服务器,所以当发布消息者发送消息时,所有的客户端其实已经全部与这台服务器建立了连接,直接群发消息就可以了。

换成分布式系统后,假如我们有两台消息服务器,那么客户端通过Nginx负载均衡后,就会有一部分连接到其中一台服务器,另一部分连接到另一台服务器,所以发布消息者发送消息时,只会发送到其中的一台服务器上,而这台消息服务器就可以执行群发操作,但问题是,另一台服务器并不知道这件事,也就无法发送消息了。

现在我们知道了根本原因是生产消息时,只有一台消息服务器能够感知到,所以我们只要让另一台消息服务器也能感知到就可以了,这样感知到之后,它就可以群发消息给连接到它上边的客户端了。

那么什么方法可以实现这种功能呢,王子很快想到了引入消息中间件,并使用它的发布订阅模式来通知所有消息服务器就可以了。

引入RabbitMQ解决分布式下的WebSocket问题

在消息中间件的选择上,王子选择了RabbitMQ,原因是它的搭建比较简单,功能也很强大,而且我们只是用到它群发消息的功能。

RabbitMQ有一个广播模式(fanout),我们使用的就是这种模式。

首先我们写一个RabbitMQ的连接类:

  1. import com.rabbitmq.client.Connection
  2. import com.rabbitmq.client.ConnectionFactory; 
  3. import java.io.IOException; 
  4. import java.util.concurrent.TimeoutException; 
  5. public class RabbitMQUtil { 
  6.     private static Connection connection
  7.      
  8.     public static Connection getConnection() { 
  9.         if (connection != null&&connection.isOpen()) { 
  10.             return connection
  11.         }        ConnectionFactory factory = new ConnectionFactory(); 
  12.         factory.setVirtualHost("/"); 
  13.         factory.setHost("192.168.220.110"); // 用的是虚拟IP地址 
  14.         factory.setPort(5672); 
  15.         factory.setUsername("guest"); 
  16.         factory.setPassword("guest"); 
  17.         try { 
  18.             connection = factory.newConnection(); 
  19.         } catch (IOException e) { 
  20.             e.printStackTrace(); 
  21.         } catch (TimeoutException e) { 
  22.             e.printStackTrace(); 
  23.         } 
  24.         return connection
  25.     } 

这个类没什么说的,就是获取MQ连接的一个工厂类。

然后按照我们的思路,就是每次服务器启动的时候,都会创建一个MQ的消费者监听MQ的消息,王子这里测试使用的是Servlet的监听器,如下:

  1. import javax.servlet.ServletContextEvent; 
  2. import javax.servlet.ServletContextListener; 
  3. public class InitListener implements ServletContextListener { 
  4.     @Override 
  5.     public void contextInitialized(ServletContextEvent servletContextEvent) { 
  6.         WebSocket.init();    }    @Override 
  7.     public void contextDestroyed(ServletContextEvent servletContextEvent) { 
  8.     }} 

记得要在Web.xml中配置监听器信息

  1. "1.0" encoding="UTF-8"?> 
  2. "http://xmlns.jcp.org/xml/ns/javaee" 
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  4.          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd" 
  5.          version="4.0"
  6.      
  7.         InitListener 
  8.      
  9.  

WebSocket中增加init方法,作为MQ消费者部分

  1. public  static void init() { 
  2.         try {            Connection connection = RabbitMQUtil.getConnection();            Channel channel = connection.createChannel();            //交换机声明(参数为:交换机名称;交换机类型) 
  3.             channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT); 
  4.             //获取一个临时队列 
  5.             String queueName = channel.queueDeclare().getQueue();            //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略) 
  6.             channel.queueBind(queueName,"fanoutLogs",""); 
  7.             //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String 
  8.             Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    super.handleDelivery(consumerTag, envelope, properties, body); 
  9.                     String message = new String(body,"UTF-8"); 
  10.                     System.out.println(message);            //这里可以使用WebSocket通过消息内容发送消息给对应的客户端 
  11.                 }            };            //声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体) 
  12.             channel.basicConsume(queueName,true,consumer); 
  13.             //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 
  14.         } catch (IOException e) {            e.printStackTrace();        }    } 

同时在接收到消息时,不是直接通过WebSocket发送消息给对应客户端,而是发送消息给MQ,这样如果消息服务器有多个,就都会从MQ中获得消息,之后通过获取的消息内容再使用WebSocket推送给对应的客户端就可以了。

WebSocket的onMessage方法增加内容如下:

  1. try { 
  2.             //尝试获取一个连接 
  3.             Connection connection = RabbitMQUtil.getConnection();            //尝试创建一个channel 
  4.             Channel channel = connection.createChannel();            //声明交换机(参数为:交换机名称; 交换机类型,广播模式) 
  5.             channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT); 
  6.             //消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可) 
  7.             channel.basicPublish("fanoutLogs",""null,msg.getBytes("UTF-8")); 
  8.             System.out.println("发布消息"); 
  9.             channel.close();        } catch (IOException |TimeoutException e) { 
  10.             e.printStackTrace(); 
  11.         } 

增加后删除掉原来的Websocket推送部分代码。

这样一整套的解决方案就完成了。

总结

到这里,我们就解决了分布式下WebSocket的推送消息问题。

我们主要是引入了RabbitMQ,通过RabbitMQ的发布订阅模式,让每个消息服务器启动的时候都去订阅消息,而无论哪台消息服务器在发送消息的时候都会发送给MQ,这样每台消息服务器就都会感知到发送消息的时间,从而再通过Websocket发送给客户端。

大体流程就是这样,那么小伙伴们有没有想过,如果RabbitMQ挂掉了几分钟,之后重启了,消费者是否可以重新连接到RabbitMQ?是否还能正常接收消息呢?

生产环境下,这个问题是必须考虑的。

这里已经测试过,消费者是支持自动重连的,所以我们可以放心的使用这套架构来解决此问题。

本文到这里就结束了,欢迎各位小伙伴留言讨论,一起学习,一起进步。

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯