技术研究背景
由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作。
通过相关的技术调研后,决定挑选基于Redis实现消息系统。
具体技术选型原因:
- 团队内部已经有搭建相关的Redis服务,并且具备一定的运维能力,可以节省技术成本
- 业界有较多关于Redis搭建消息系统方面的技术文章
- 目前的系统的整体吞吐量并不高,接入消息系统的主要目的只是为了实现系统之间的解耦
为了方便让读者们从0到1地学习这块内容,我将会从环节搭建开始介绍起。
基本环境的搭建
基于redis6.0.6版本搭建一套简单的消息队列系统。 环境部署:
docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6
- 参数解释: -d 后台启动 -p 端口映射 -name 容器名称
如果本地没有相关镜像,可以尝试通过搭建下方命令进行镜像的拉取:
docker pull redis:6.0.6
当redis的基础环境配置好了之后,接下来便是基于redis内置的一些基本功能开发一款消息队列组件了。
下边我将分三种不同的技术方案来介绍如何实现一款轻量级的消息队列。
基于常规的队列结构来实现消息队列
这块的实现比较简单,主要是基于Redis内部的List结构来落地的,发送方将消息从队列的左边写入,然后消费方从队列的右边读取。
package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RedisListMQTemplate implements IMQTemplate {
@Resource
private IRedisService iRedisService;
@Override
public boolean send(MsgWrapper msgWrapper) {
try {
String json = JSON.toJSONString(msgWrapper.getMsgInfo());
iRedisService.lpush(msgWrapper.getTopic(),json);
return true;
}catch (Exception e){
e.printStackTrace();
}
return false;
}
}
问题思考
这里存在几个问题点需要思考下:
多个服务之间如何订阅同一个消息
这里我建议可以按照系统的项目名称前缀+业务标识来组织。
例如:用户系统中需要发布一条 会员已升级 的消息给到下游系统,此时可以将这条消息写入到名为:user-service:member-upgrade-list 的List集合中。
如果订单系统希望访问用户系统的消息,则需要在redis的key里指定user-service:member-upgrade-list关键字。
在这里插入图片描述
消息的监听机制如何实现?
对于List的消息可以采用轮询的方式获取,例如下边这段案例代码:
private void pollingGet(MsgWrapper msgWrapper) {
while (true) {
String value = iRedisService.rpop(msgWrapper.getTopic());
if (!StringUtils.isEmpty(value)) {
System.out.println(value);
}
//减少访问压力,定期睡眠一段时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
但是轮询的方式比较消耗性能,所以可以尝试使用Redis的阻塞式弹出指令,例如下边这种方式来监听消息的触发行为:
private void blockGet(MsgWrapper msgWrapper) {
while (true) {
List<String> values = iRedisService.brpop(msgWrapper.getTopic());
if (!CollectionUtils.isEmpty(values)) {
values.forEach(value -> {
System.out.println(value);
});
}
}
}
消息的可靠性传输如何确保?
在设计消息队列的时候,我们非常看重的就是消息的可靠性保证。当一条消息发送到消费端之后,如果出现了异常,希望消息能够实现重新发送的效果。
对于这种场景的设计我们可以尝试使用 BRPOPLPUSH 这条指令,这条指令可以帮助我们在Redis内部将数据弹出时写入到另一个备份队列中,这样即使弹出的消息消费失败了,备份队列中还有一份备用消息可以使用,而且弹出和写入备份队列操作在Redis内部做了封装,外界调用可以视作为一个原子操作。
是否可以支持广播的模式?
从List集合的实现原理来看,Redis弹出的元素只能返回给一个客户端链接,因此无法支持广播这种效果的实现。
基于发布订阅功能实现消息队列
Redis的内部提供了一个叫做发布订阅的功能,通过subscibe命令和publish指令可以帮助我们实现关于消息发布和通知的功能。
使用subscibe/publish命令实现的效果和List结构最大的不同在于它的传输方式:
- list更多的是实现点对点方式的传输(P2P方式)
- subscibe/publish则是可以实现广播的方式和订阅者进行通信
publish部分的案例代码:
@Override
public boolean publish(String channel, String content) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.publish(channel, content);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
subscibe部分的代码:
@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.subscribe(jedisPubSub, channel);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
监听的部分可以通过额外开启一个线程来实现这部分效果:
@Component
public class RedisSubscribeMQListener implements IMQListener {
@Resource
private IRedisService iRedisService;
class TestChannel extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
System.out.println("channel " + channel + " 接收到消息:" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
//所有频道的消息都监听
@Override
public void onMessageReach(MsgWrapper msgWrapper) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
}
});
thread.start();
}
}
要注意,回调通知的时候需要注入一个JedisPubSub的对象,这个对象的内部定义了接收消息之后的处理操作。
问题思考
如何保证消息的可靠性传输?
通过subscibe/publish处理的消息没有持久化的特性,一旦出现网络中断,Redis宕机这类异常的时候就会导致消息丢失,而且也没有较好的机制取支持消息重复消费的问题。因此可靠性方面较差。
基于Stream实现消息队列
Redis5.0中发布的Stream类型,也用来实现典型的消息队列。提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
- 消息ID的序列化生成
- 消息遍历
- 消息的阻塞和非阻塞读取
- 消息的分组消费
- 未完成消息的处理
- 消息队列监控
关于Stream的一些基本入门篇章这里不做过多介绍,感兴趣的朋友可以去阅读下这篇文章:
https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd
下边的部分我们直接来进入关于Redis XStream相关的实战环节。
封装消息监听功能
首先是定义一个MQ相关的接口:
public interface RedisStreamListener {
HandlerResult handleMsg(StreamEntry streamEntry);
}
接着是基于这套接口做消息发送的实现:
package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
@Resource
private IRedisService iRedisService;
@Override
public HandlerResult handleMsg(StreamEntry streamEntry) {
Map<String, String> map = streamEntry.getFields();
String json = map.get("json");
PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
System.out.println("pending payMsg is : " + payMsg);
return SUCCESS;
}
}
自定义消息注解
package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
String streamName() default "";
String groupName() default "";
String consumerName() default "";
}
代码中有一个自定义的@StreamListener的注解,该注解的内部包含了一个@Component的注解,可以将使用了该注解的对象注入到Spring容器中。
为了能将这些个初始化类进行自动装配,还需要加入一个配置的对象,代码如下:
package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
@Configuration
public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private ApplicationContext applicationContext;
@Resource
private IRedisService iRedisService;
private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
beanMap.values().forEach(redisStreamMQListener -> {
StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
handleThread.start();
pendingHandleThread.start();
logger.info("{} load successed ", redisStreamMQListener);
});
}
class PendingMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
String startId = "0-0";
while (true) {
List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
//如果该集合非空,则触发监听行为
if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
StreamEntryID streamEntryID = streamConsumersInfo.getID();
//比当前pending的streamId小1
String streamIdStr = streamEntryID.toString();
String[] items = streamIdStr.split("-");
Long timestamp = Long.valueOf(items[0]) - 1;
String beforeId = timestamp + "-" + "0";
List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
for (Map.Entry<String, List<StreamEntry>> streamInfo : result) {
List<StreamEntry> streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
try {
//业务处理
HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(handlerResult)) {
startId = streamEntryID.toString();
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
}
} catch (Exception e) {
logger.error("[PendingMsgHandlerThread] e is ", e);
}
}
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class CoreMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
while (true) {
List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) {
List<StreamEntry> streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
//业务处理
try {
HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(result)) {
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
}
} catch (Exception e) {
logger.error("[CoreMsgHandlerThread] e is ", e);
}
}
}
}
}
}
}
其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。
封装相关的消息发布功能
消息的发送部分比较简单,直接通过redis往stream里面写入数据即可
package org.idea.mq.redis.framework.producer;
public interface IStreamProducer {
void sendMsg(String streamName, String json);
}
消息的传输格式采用json字符串的方式写入到redis内部的stream当中。
package org.idea.mq.redis.framework.producer;
import org.idea.mq.redis.framework.redis.IRedisService;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
public class StreamProducer implements IStreamProducer{
@Resource
private IRedisService iRedisService;
@Override
public void sendMsg(String streamName,String json){
Map<String,String> map = new HashMap<>();
map.put("json",json);
iRedisService.xAdd(streamName,map);
}
}
注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:
@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:
在这里插入图片描述
组件的测试
点对点发送测试
建立两套微服务工程,user-service 和 order-service,其中user-service部署两个服务节点,同属user-service-group。order-service也要部署两个服务节点,同属order-service-group。
最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。
在这里插入图片描述
广播发送测试
使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。
在这里插入图片描述
具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。
问题思考
为何同一个StreamName需要采用双线程消费?
一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。
是否支持延迟重试
目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。