文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ-消息消费模式 顺序消费

2023-08-16 18:36

关注

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
在这里插入图片描述
在这里插入图片描述

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

在这里插入图片描述在这里插入图片描述

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
在这里插入图片描述

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer {    public static void main(String[] args) throws Exception {        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");        //设置nameServer地址        consumer.setNamesrvAddr("43.143.161.59:9876");        //设置订阅的主题        consumer.subscribe("helloTopic","*");        //设置消费模式        consumer.setMessageModel(MessageModel.BROADCASTING);        //设置消息的监听器        consumer.setMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {                for(MessageExt msg:list){                    String s = new String(msg.getBody(), Charset.defaultCharset());                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);                }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        //启动消费者        consumer.start();    }}

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
在这里插入图片描述
在这里插入图片描述


实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

创建实体类

@Setter@Getterpublic class OrderStep {    private long orderId;    private String desc;    @Override    public String toString() {        return "OrderStep{" +                "orderId=" + orderId +                ", desc='" + desc + '\'' +                '}';    }}

创建测试类

public class OrderUtil {    public static List buildOrders(){        List orderList = new ArrayList();        OrderStep orderDemo = new OrderStep();        orderDemo.setOrderId(15103111039L);        orderDemo.setDesc("创建");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103111065L);        orderDemo.setDesc("创建");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103111039L);        orderDemo.setDesc("付款");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103117235L);        orderDemo.setDesc("创建");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103111065L);        orderDemo.setDesc("付款");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103117235L);        orderDemo.setDesc("付款");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103111065L);        orderDemo.setDesc("完成");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103111039L);        orderDemo.setDesc("推送");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103117235L);        orderDemo.setDesc("完成");        orderList.add(orderDemo);        orderDemo = new OrderStep();        orderDemo.setOrderId(15103111039L);        orderDemo.setDesc("完成");        orderList.add(orderDemo);        return orderList;    }}

创建生产者类

public class Producer {        public static void main(String[] args) throws Exception {            DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");            producer.setNamesrvAddr("43.143.161.59:9876");            producer.start();            String topic = "orderTopic";            List orderSteps = OrderUtil.buildOrders();            for(OrderStep step:orderSteps){                Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));                producer.sendOneway(msg);            }            producer.shutdown();        }}

创建消费者类

public class Consumer {    public static void main(String[] args) throws Exception {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");        consumer.setNamesrvAddr("43.143.161.59:9876");        consumer.subscribe("orderTopic","*");        consumer.setMessageModel(MessageModel.BROADCASTING);        consumer.setMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {                for(MessageExt msg:list){                    String s = new String(msg.getBody(), Charset.defaultCharset());                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);                }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        //启动消费者        consumer.start();    }}

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
在这里插入图片描述

如何改实现顺序消费

生产者类

public class Producer {    public static void main(String[] args) throws Exception {        DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");        producer.setNamesrvAddr("43.143.161.59:9876");        producer.start();        String topic = "orderTopic";        List orderSteps = OrderUtil.buildOrders();        //设置队列选择器        MessageQueueSelector selector = new MessageQueueSelector() {            @Override            public MessageQueue select(List list, Message message, Object o) {                System.out.println("队列个数"+list.size());                Long orderId = (Long) o;                int index = (int)(orderId % list.size());                return list.get(index);            }        };        for(OrderStep step:orderSteps){            Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));            //指定消息选择器,换入的参数            producer.send(msg,selector,step.getOrderId());        }        producer.shutdown();    }}

消费者类

public class Consumer {    public static void main(String[] args) throws Exception {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");        consumer.setNamesrvAddr("43.143.161.59:9876");        consumer.subscribe("orderTopic","*");        //从什么地方开始消费        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        //一个队列对应一个线程        consumer.setMessageListener(new MessageListenerOrderly() {            @Override            public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {                for(MessageExt msg:list){                    System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));                }                return ConsumeOrderlyStatus.SUCCESS;            }        });        //启动消费者        consumer.start();    }}

来源地址:https://blog.csdn.net/m0_62434717/article/details/128988855

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯