文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

实战干货:基于Redis6.0 部署迷你版本消息队列

2024-12-02 07:11

关注

技术研究背景

由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作。

通过相关的技术调研后,决定挑选基于Redis实现消息系统。

具体技术选型原因:

为了方便让读者们从0到1地学习这块内容,我将会从环节搭建开始介绍起。

基本环境的搭建

基于redis6.0.6版本搭建一套简单的消息队列系统。 环境部署:

docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6

如果本地没有相关镜像,可以尝试通过搭建下方命令进行镜像的拉取:

docker pull redis:6.0.6

当redis的基础环境配置好了之后,接下来便是基于redis内置的一些基本功能开发一款消息队列组件了。

下边我将分三种不同的技术方案来介绍如何实现一款轻量级的消息队列。

基于常规的队列结构来实现消息队列

这块的实现比较简单,主要是基于Redis内部的List结构来落地的,发送方将消息从队列的左边写入,然后消费方从队列的右边读取。

package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

@Component
public class RedisListMQTemplate implements IMQTemplate {
@Resource
private IRedisService iRedisService;
@Override
public boolean send(MsgWrapper msgWrapper) {
try {
String json = JSON.toJSONString(msgWrapper.getMsgInfo());
iRedisService.lpush(msgWrapper.getTopic(),json);
return true;
}catch (Exception e){
e.printStackTrace();
}
return false;
}
}

问题思考

这里存在几个问题点需要思考下:

多个服务之间如何订阅同一个消息

这里我建议可以按照系统的项目名称前缀+业务标识来组织。

例如:用户系统中需要发布一条 会员已升级 的消息给到下游系统,此时可以将这条消息写入到名为:user-service:member-upgrade-list 的List集合中。

如果订单系统希望访问用户系统的消息,则需要在redis的key里指定user-service:member-upgrade-list关键字。

在这里插入图片描述

消息的监听机制如何实现?

对于List的消息可以采用轮询的方式获取,例如下边这段案例代码:


private void pollingGet(MsgWrapper msgWrapper) {
while (true) {
String value = iRedisService.rpop(msgWrapper.getTopic());
if (!StringUtils.isEmpty(value)) {
System.out.println(value);
}
//减少访问压力,定期睡眠一段时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

但是轮询的方式比较消耗性能,所以可以尝试使用Redis的阻塞式弹出指令,例如下边这种方式来监听消息的触发行为:


private void blockGet(MsgWrapper msgWrapper) {
while (true) {
List<String> values = iRedisService.brpop(msgWrapper.getTopic());
if (!CollectionUtils.isEmpty(values)) {
values.forEach(value -> {
System.out.println(value);
});
}
}
}

消息的可靠性传输如何确保?

在设计消息队列的时候,我们非常看重的就是消息的可靠性保证。当一条消息发送到消费端之后,如果出现了异常,希望消息能够实现重新发送的效果。

对于这种场景的设计我们可以尝试使用 BRPOPLPUSH 这条指令,这条指令可以帮助我们在Redis内部将数据弹出时写入到另一个备份队列中,这样即使弹出的消息消费失败了,备份队列中还有一份备用消息可以使用,而且弹出和写入备份队列操作在Redis内部做了封装,外界调用可以视作为一个原子操作。

是否可以支持广播的模式?

从List集合的实现原理来看,Redis弹出的元素只能返回给一个客户端链接,因此无法支持广播这种效果的实现。

基于发布订阅功能实现消息队列

Redis的内部提供了一个叫做发布订阅的功能,通过subscibe命令和publish指令可以帮助我们实现关于消息发布和通知的功能。

使用subscibe/publish命令实现的效果和List结构最大的不同在于它的传输方式:

publish部分的案例代码:

@Override
public boolean publish(String channel, String content) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.publish(channel, content);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

subscibe部分的代码:

@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.subscribe(jedisPubSub, channel);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

监听的部分可以通过额外开启一个线程来实现这部分效果:

@Component
public class RedisSubscribeMQListener implements IMQListener {
@Resource
private IRedisService iRedisService;
class TestChannel extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
System.out.println("channel " + channel + " 接收到消息:" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
//所有频道的消息都监听
@Override
public void onMessageReach(MsgWrapper msgWrapper) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
}
});
thread.start();
}
}

要注意,回调通知的时候需要注入一个JedisPubSub的对象,这个对象的内部定义了接收消息之后的处理操作。

问题思考

如何保证消息的可靠性传输?

通过subscibe/publish处理的消息没有持久化的特性,一旦出现网络中断,Redis宕机这类异常的时候就会导致消息丢失,而且也没有较好的机制取支持消息重复消费的问题。因此可靠性方面较差。

基于Stream实现消息队列

Redis5.0中发布的Stream类型,也用来实现典型的消息队列。提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

关于Stream的一些基本入门篇章这里不做过多介绍,感兴趣的朋友可以去阅读下这篇文章:

​  https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd ​

下边的部分我们直接来进入关于Redis XStream相关的实战环节。

封装消息监听功能

首先是定义一个MQ相关的接口:

public interface RedisStreamListener {

HandlerResult handleMsg(StreamEntry streamEntry);
}

接着是基于这套接口做消息发送的实现:

package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;

@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
@Resource
private IRedisService iRedisService;
@Override
public HandlerResult handleMsg(StreamEntry streamEntry) {
Map<String, String> map = streamEntry.getFields();
String json = map.get("json");
PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
System.out.println("pending payMsg is : " + payMsg);
return SUCCESS;
}
}

自定义消息注解

package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
String streamName() default "";
String groupName() default "";
String consumerName() default "";
}

代码中有一个自定义的@StreamListener的注解,该注解的内部包含了一个@Component的注解,可以将使用了该注解的对象注入到Spring容器中。

为了能将这些个初始化类进行自动装配,还需要加入一个配置的对象,代码如下:

package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;

@Configuration
public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private ApplicationContext applicationContext;
@Resource
private IRedisService iRedisService;
private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
beanMap.values().forEach(redisStreamMQListener -> {
StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
handleThread.start();
pendingHandleThread.start();
logger.info("{} load successed ", redisStreamMQListener);
});
}
class PendingMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
String startId = "0-0";
while (true) {
List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
//如果该集合非空,则触发监听行为
if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
StreamEntryID streamEntryID = streamConsumersInfo.getID();
//比当前pending的streamId小1
String streamIdStr = streamEntryID.toString();
String[] items = streamIdStr.split("-");
Long timestamp = Long.valueOf(items[0]) - 1;
String beforeId = timestamp + "-" + "0";
List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
for (Map.Entry<String, List<StreamEntry>> streamInfo : result) {
List<StreamEntry> streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
try {
//业务处理
HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(handlerResult)) {
startId = streamEntryID.toString();
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
}
} catch (Exception e) {
logger.error("[PendingMsgHandlerThread] e is ", e);
}
}
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class CoreMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
while (true) {
List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) {
List<StreamEntry> streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
//业务处理
try {
HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(result)) {
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
}
} catch (Exception e) {
logger.error("[CoreMsgHandlerThread] e is ", e);
}
}
}
}
}
}
}

其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。

封装相关的消息发布功能

消息的发送部分比较简单,直接通过redis往stream里面写入数据即可

package org.idea.mq.redis.framework.producer;

public interface IStreamProducer {

void sendMsg(String streamName, String json);
}

消息的传输格式采用json字符串的方式写入到redis内部的stream当中。

package org.idea.mq.redis.framework.producer;
import org.idea.mq.redis.framework.redis.IRedisService;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

public class StreamProducer implements IStreamProducer{
@Resource
private IRedisService iRedisService;
@Override
public void sendMsg(String streamName,String json){
Map<String,String> map = new HashMap<>();
map.put("json",json);
iRedisService.xAdd(streamName,map);
}
}

注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:

@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:

在这里插入图片描述

组件的测试

点对点发送测试

建立两套微服务工程,user-service 和 order-service,其中user-service部署两个服务节点,同属user-service-group。order-service也要部署两个服务节点,同属order-service-group。

最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。

在这里插入图片描述

广播发送测试

使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。

在这里插入图片描述

具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。

问题思考

为何同一个StreamName需要采用双线程消费?

一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。

是否支持延迟重试

目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。

来源:Java知音内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯