RocketMQ-消息消费模式 顺序消费
集群模式
在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
集群模式的演示(本身就默认)
假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条
Rocketmq存储队列
在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。
为什么是四个队列?
因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
广播模式
在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到
public class Consumer { public static void main(String[] args) throws Exception { //定义消息消费者(在同一个JVM中,消费者的组名不能重复) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup"); //设置nameServer地址 consumer.setNamesrvAddr("43.143.161.59:9876"); //设置订阅的主题 consumer.subscribe("helloTopic","*"); //设置消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); //设置消息的监听器 consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg:list){ String s = new String(msg.getBody(), Charset.defaultCharset()); System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); }}
运行结果:
生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程
假设我们没有实现顺序消费的时候
创建生产者
创建实体类
@Setter@Getterpublic class OrderStep { private long orderId; private String desc; @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; }}
创建测试类
public class OrderUtil { public static List buildOrders(){ List orderList = new ArrayList(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; }}
创建生产者类
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup"); producer.setNamesrvAddr("43.143.161.59:9876"); producer.start(); String topic = "orderTopic"; List orderSteps = OrderUtil.buildOrders(); for(OrderStep step:orderSteps){ Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset())); producer.sendOneway(msg); } producer.shutdown(); }}
创建消费者类
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup"); consumer.setNamesrvAddr("43.143.161.59:9876"); consumer.subscribe("orderTopic","*"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg:list){ String s = new String(msg.getBody(), Charset.defaultCharset()); System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); }}
运行结果:
可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
如何改实现顺序消费
生产者类
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup"); producer.setNamesrvAddr("43.143.161.59:9876"); producer.start(); String topic = "orderTopic"; List orderSteps = OrderUtil.buildOrders(); //设置队列选择器 MessageQueueSelector selector = new MessageQueueSelector() { @Override public MessageQueue select(List list, Message message, Object o) { System.out.println("队列个数"+list.size()); Long orderId = (Long) o; int index = (int)(orderId % list.size()); return list.get(index); } }; for(OrderStep step:orderSteps){ Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset())); //指定消息选择器,换入的参数 producer.send(msg,selector,step.getOrderId()); } producer.shutdown(); }}
消费者类
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup"); consumer.setNamesrvAddr("43.143.161.59:9876"); consumer.subscribe("orderTopic","*"); //从什么地方开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //一个队列对应一个线程 consumer.setMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { for(MessageExt msg:list){ System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset())); } return ConsumeOrderlyStatus.SUCCESS; } }); //启动消费者 consumer.start(); }}
来源地址:https://blog.csdn.net/m0_62434717/article/details/128988855