文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息

2024-12-02 07:53

关注

引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们来聊一聊 RocketMQ 怎么做能确保消息不丢失。

1 RocketMQ 简介

RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同时 Name Server 进行集群注册管理和保存元数据。

2 消息不丢失

要想保证消息不丢失,需要从以下几个方面考虑:

维度 1:同步发送,代码如下:

public void send() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");
SendResult sendResult = null;

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}
if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}

同步发送会返回 4 个状态码:

根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。

消息重试时,消费端一定要做好幂等处理。

维度 2:异步发送,代码如下:

public void sendAsync() throws Exception {
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.putUserProperty("name1","value1");

DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {

}

@Override
public void onException(Throwable e) {
// TODO 可以在这里加入重试逻辑
}
});
}

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

维度 3:刷盘策略

flushDiskType=SYNC_FLUSH

维度 4:Broker 多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。如下图:

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

维度 5:消息确认

Consumer 消费消息的代码如下:

public void consume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try{
System.out.printf("Receive New Messages: %s", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

维度 6:Consumer 重试

Consumer 消费失败,这里有 3 种情况:

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。

注意:

其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地,给 Broker 返回CONSUME_SUCCESS 来结束重试。代码如下:

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) {
//TODO 把消息写入本地存储
System.out.println("重试次数超过3次");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

维度7:事务消息

RocketMQ支持事务消息,整体流程如下图:

代码如下:

public class ProducerTransactionListenerImpl implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {

return LocalTransactionState.UNKNOW;
}
}

维度 8:消息索引

我们知道,RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一个索引文件,结构如下图:

查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。

在 Producer 发送消息时,可以指定一个 key,代码如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。

维度 9:极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

3 总结

在一些特殊的业务场景,比如支付、银行核算等,需要确保消息不丢失,但是同时也要看到,消息不丢失的方案会大大降低 RocketMQ 的吞吐量,需要综合考虑。


来源:君哥聊技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯