图片
简单的概括一下:如果我们的项目是分布式环境,登录的用户被Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?
今天就来解答一下球友的问题:其实,要解决这个问题就需要实现分布式WebSocket,而分布式WebSocket一般可以通过以下两种方案来实现:
- 将消息(<用户id,消息内容>)统一推送到一个消息队列(Redis、Kafka等)的的topic,然后每个应用节点都订阅这个topic,在接收到WebSocket消息后取出这个消息的“消息接收者的用户ID/用户名”,然后再比对自身是否存在相应用户的连接,如果存在则推送消息,否则丢弃接收到的这个消息(这个消息接收者所在的应用节点会处理)
- 在用户建立WebSocket连接后,使用Redis缓存记录用户的WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的应用节点上面(实现上比方案一要复杂,但是网络流量会更低)
实现方案
下面将以第一种方案来具体实现,实现方式如下
已加入星球的小伙伴如需案例源码联系陈某!
1. 定义一个WebSocket Channel枚举类
public enum WebSocketChannelEnum {
//测试使用的简易点对点聊天
CHAT("CHAT", "测试使用的简易点对点聊天", "/topic/reply");
WebSocketChannelEnum(String code, String description, String subscribeUrl) {
this.code = code;
this.description = description;
this.subscribeUrl = subscribeUrl;
}
private String code;
private String description;
private String subscribeUrl;
public String getCode() {
return code;
}
public String getDescription() {
return description;
}
public String getSubscribeUrl() {
return subscribeUrl;
}
public static WebSocketChannelEnum fromCode(String code){
if(StringUtils.isNoneBlank(code)){
for(WebSocketChannelEnum channelEnum : values()){
if(channelEnum.code.equals(code)){
return channelEnum;
}
}
}
return null;
}
}
2. 配置基于Redis的消息队列
需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用Kafka、rabbitMQ等专业的消息队列中间件
@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
@Value("${spring.redis.timeout}")
private String timeOut;
@Value("${spring.redis.cluster.nodes}")
private String nodes;
@Value("${spring.redis.cluster.max-redirects}")
private int maxRedirects;
@Value("${spring.redis.jedis.pool.max-active}")
private int maxActive;
@Value("${spring.redis.jedis.pool.max-wait}")
private int maxWait;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Bean
public JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}
@Bean
public RedisClusterConfiguration redisClusterConfiguration(){
RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
configuration.setMaxRedirects(maxRedirects);
return configuration;
}
@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){
return new JedisConnectionFactory(configuration,jedisPoolConfig);
}
@Bean
public Jackson2JsonRedisSerializer
需要注意的是,这里使用的配置如下所示:
spring:
...
#redis
redis:
cluster:
nodes: namenode22:6379,datanode23:6379,datanode24:6379
max-redirects: 6
timeout: 300000
jedis:
pool:
max-active: 8
max-wait: 100000
max-idle: 8
min-idle: 0
#自定义的监听的TOPIC路径
message:
topic-name: topic-test
3. 定义一个Redis消息的处理者
@Component
public class MessageReceiver {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
//1. 取出用户名并判断是否连接到当前应用节点的WebSocket
SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
//2. 获取WebSocket客户端的订阅地址
WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if(channelEnum != null){
//3. 给WebSocket客户端发送消息
messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
}
}
}
}
4. 在Controller中发送WebSocket消息
@Controller
@RequestMapping(("/wsTemplate"))
public class RedisMessageController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request) {
//消息接收者
String receiver = request.getParameter("receiver");
//消息内容
String msg = request.getParameter("msg");
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));
return "ok";
}
private void sendToUser(String sender, String receiver, String destination, String payload){
SimpUser simpUser = userRegistry.getUser(receiver);
//如果接收者存在,则发送消息
if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
messagingTemplate.convertAndSendToUser(receiver, destination, payload);
}
//如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息
else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){
RedisWebsocketMsg redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
redisService.convertAndSend(topicName, redisWebsocketMsg);
}
//否则将消息存储到redis,等用户上线后主动拉取未读消息
else{
//存储消息的Redis列表名
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中", receiver, sender, payload, listKey));
//存储消息到Redis中
redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
}
}
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map pullUnreadMessage(String destination){
Map result = new HashMap<>();
try {
HttpSession session = SpringContextUtils.getSession();
//当前登录用户
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
//存储消息的Redis列表名
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
//从Redis中拉取所有未读消息
List messageList = redisService.rangeList(listKey, 0, -1);
result.put("code", "200");
if(messageList !=null && messageList.size() > 0){
//删除Redis中的这个未读消息列表
redisService.delete(listKey);
//将数据添加到返回集,供前台页面展示
result.put("result", messageList);
}
}catch (Exception e){
result.put("code", "500");
result.put("msg", e.getMessage());
}
return result;
}
}
5. WebSocket相关配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客户端需要把消息发送到/message/xxx地址
registry.setApplicationDestinationPrefixes("/message");
//服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
registry.enableSimpleBroker("/topic");
//给指定用户发送消息的路径前缀,默认值是/user/
registry.setUserDestinationPrefix("/user/");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(myChannelInterceptor);
}
}
6. 示例页面
Chat With STOMP Message