文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ 如何保证发送消息不丢失?

2024-11-29 20:10

关注

本文源码基于: Apache RocketMQ release-5.2.0

一、同步发送

1.原理分析

在同步发送模式下,RocketMQ 默认采用同步刷盘方式,当生产者将消息发送到 Broker 后,会等待 Broker 的响应(默认超时 5分钟),Broker 接收消息后,会将其写入内存缓存,并进行刷盘操作。因此,如果 Broker 响应成功,代表消息一定成功写入磁盘。

同步发送主要涉及以下几个步骤:

如下示例代码为一个完整的同步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class SyncProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、创建 producer,设置组名为 SyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定 Topic,Tag和消息体
    Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));
    // 5、发送同步消息
    SendResult sendResult = producer.send(msg);
    // 6、通过 sendResult 判断消息是否成功送达
    System.out.printf("message send result:" + sendResult);
    // 7、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的同步发送主要涉及以下几个关键源码类和方法:

源码参考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg)

2.优缺点

优点:

缺点:

3.使用场景

适用于对消息可靠性要求较高的场景,如订单系统、金融交易、重要的消息通知等。

二、异步发送

1.原理分析

在异步发送模式下,RocketMQ 默认采用异步刷盘方式,当生产者发送消息到 Broker 后,消息写入内存缓存成功后,Broker 立即返回响应(默认超时 5分钟),后台线程再异步将消息批量写入磁盘。因此,这种方式提高了系统的吞吐量和性能,但在系统崩溃时可能会丢失部分未刷盘的消息。

异步发送主要涉及以下几个步骤:

如下示例代码为一个完整的异步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、创建 producer,设置组名为 AsyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));
    // 5、发送异步消息,SendCallback是处理异步回调的方法
    producer.send(msg, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {  // 成功回调
        System.out.println("message send success: " + sendResult);
      }
      @Override
      public void onException(Throwable throwable) {  // 失败回调
        System.out.println("message send fail: " + throwable);
      }
    });
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的异步发送主要涉及以下几个关键源码类和方法:

源码参考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg, SendCallback sendCallback)

2.优缺点

优点:

缺点:

3.使用场景

适用于对响应时间要求较高的场景,如实时数据处理、日志采集、消费信息的推送等。

三、单向发送

1.原理分析

单向(OneWay)发送是一种只负责发送消息而不等待任何响应的方式。生产者将消息发送到 Broker 后(默认超时 5分钟),不关心消息是否成功到达或被持久化,主要依赖 Broker 进行刷盘操作,单向发送通常与异步刷盘结合使用,以提高发送效率。

单向发送主要涉及以下几个步骤:

如下示例代码为一个完整的单向发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducerTest {
  public static void main(String[] args) throws Exception {
    // 1、创建 producer,设置组名为 OneWayGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));
    // 5、发送单向消息
    producer.sendOneway(msg);
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的单向发送主要涉及以下几个关键类和方法:

源码参考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendOneway(Message msg)

2.优缺点

优点:

缺点:

3.使用场景

适用于对可靠性要求不高的场景,如日志收集、监控数据上报等。

三种方式对比

发送方式

优点

缺点

使用场景

同步发送

可靠性高,简单易用

延迟较高,吞吐量受限

订单系统、金融交易、重要的消息通知等

异步发送

非阻塞,延迟较低

实现复杂度高,可靠性相对降低

实时数据处理、日志采集、消费信息的推送等

单向发送

高效,延迟最低

无法确认消息是否成功发送,可靠性最低

日志收集、监控数据上报等

如何选择?

每种发送方式都有其适用的场景和优缺点,具体如何选择,一定需要根据业务需求进行权衡。

总结

本文分析了 RocketMQ 同步发送、异步发送和单向发送三种方式的原理、优缺点以及使用场景,并且分析了每种方式涉及到的核心源码。

通过上文的介绍可以知道同步发送方式可以保证消息发送时不丢,但是性能相对其他两种方式差一些。

RocketMQ 是一款优秀的开源消息中间件,作为 Java程序员,建议多去阅读它的源码,吸收其中比较好的代码思维。

来源:猿java内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯