文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java RabbitMQ如何实现持久化和发布确认

2023-06-29 10:46

关注

这篇文章主要介绍Java RabbitMQ如何实现持久化和发布确认,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

1. 持久化

当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。

1.1 实现持久化

队列持久化:在创建队列时将channel.queueDeclare();第二个参数改为true。

消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

public class Producer3 {    private static final String LONG_QUEUE = "long_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 持久化队列        channel.queueDeclare(LONG_QUEUE,true,false,false,null);        Scanner scanner = new Scanner(System.in);        int i = 0;        while (scanner.hasNext()){            i++;            String msg = scanner.next() + i;            // 持久化消息            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            System.out.println("发送消息:'" + msg + "'成功");        }    }}

但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

1.2 不公平分发

轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

在消费者处修改channel.basicQos(1);表示开启不公平分发

public class Consumer2 {    private static final String LONG_QUEUE = "long_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        DeliverCallback deliverCallback = (consumerTag, message) -> {            // 模拟并发沉睡三十秒            try {                Thread.sleep(30000);                System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);            } catch (InterruptedException e) {                e.printStackTrace();            }        };        // 设置不公平分发        channel.basicQos(1);        channel.basicConsume(LONG_QUEUE,false,deliverCallback,                consumerTag -> {                    System.out.println(consumerTag + "消费者取消消费");                });    }}

1.3 测试不公平分发

测试目的:是否能实现能者多劳。

测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

先启动生产者创建队列,再分别启动两个消费者。

生产者按照顺序发四条消息:

Java RabbitMQ如何实现持久化和发布确认

睡眠时间短的线程A接收到了三条消息

Java RabbitMQ如何实现持久化和发布确认

而睡眠时间长的线程B只接收到的第二条消息:

Java RabbitMQ如何实现持久化和发布确认

因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

实验成功!

1.4 预取值

消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

1.4.1 代码测试

测试方法:

新建两个不同的消费者分别给定预期值5个2。

给睡眠时间长的指定为5,时间短的指定为2。

假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

代码根据上述代码修改预期值即可。

2. 发布确认

发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();

2.1 单个确认发布

是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

public class SoloProducer {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_solo";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 记录开始时间        long beginTime = System.currentTimeMillis();        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = ""+i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            // 单个发布确认            boolean flag = channel.waitForConfirms();            if (flag){                System.out.println("发送消息:" + i);            }        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }}

2.2 批量确认发布

一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

public class BatchProducer {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_batch";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 设置一个多少一批确认一次。        int batchSize = MESSAGE_COUNT / 10;        // 记录开始时间        long beginTime = System.currentTimeMillis();        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = ""+i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            // 批量发布确认            if (i % batchSize == 0){                if (channel.waitForConfirms()){                    System.out.println("发送消息:" + i);                }            }        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");    }}

显然效率要比单个确认发布的高很多。

2.3 异步确认发布

在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

public class AsyncProducer {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_async";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 记录开始时间        long beginTime = System.currentTimeMillis();        // 确认成功回调        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{            System.out.println("确认成功消息:" + deliveryTab);        };        // 确认失败回调        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{            System.out.println("未确认的消息:" + deliveryTab);        };        // 消息监听器                channel.addConfirmListener(ackCallback,nackCallback);        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = "" + i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");    }}

2.4 处理未确认的消息

最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。

处理方式:

记录要发送的全部消息;

在发布成功确认处删除;

打印未确认的消息。

使用一个哈希表存储消息,它的优点:

可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
public class AsyncProducerRemember {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_async_remember";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 线程安全有序的一个hash表,适用与高并发        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();        // 记录开始时间        long beginTime = System.currentTimeMillis();        // 确认成功回调        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{            //2. 在发布成功确认处删除;            // 批量删除            if (multiple){                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);                confirmMap.clear();            }else {                // 单独删除                map.remove(deliveryTab);            }            System.out.println("确认成功消息:" + deliveryTab);        };        // 确认失败回调        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{            // 3. 打印未确认的消息。            System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);        };        // 消息监听器                channel.addConfirmListener(ackCallback,nackCallback);        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = "" + i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            // 1. 记录要发送的全部消息;            map.put(channel.getNextPublishSeqNo(),msg);        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");    }}

以上是“Java RabbitMQ如何实现持久化和发布确认”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网行业资讯频道!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯