文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RabbitMQ是如何确定消息是否投递到队列中的

2024-12-10 15:58

关注

1. 前言

在使用RabbitMQ消息中间件时,因为消息的投递是异步的,默认情况下,RabbitMQ会删除那些无法路由的消息。为了能够检出消息是否顺利投递到队列,我们需要相应的处理机制。今天就来验证一下相关的验证机制。

2. 消息投递失败

那么哪些情况消息会投递失败呢?RabbitMQ消息会先到达指定的交换机,然后由交换机路由到对应的队列。所以以下几种情况会导致消息投递失败。

3. 投递失败的处理机制

对应上面的两种情况,RabbitMQ提供了对应的解决方案。

ConfirmCallback

RabbitMQ提供了ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后进行确认回调。

在Spring Boot中需要开启:

  1. spring: 
  2.   rabbitmq: 
  3.   # 通常选择 correlated 
  4.     publisher-confirm-type: 

通常有三种选择:

这里我使用CORRELATED模式,声明一个ConfirmCallback并设置到RabbitTemplate中

  1. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { 
  2.     // correlationData 可能为空 
  3.     if (ack) { 
  4.         log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); 
  5.     } else { 
  6.         log.debug("消息发送到exchange失败,原因: {}", cause); 
  7.     } 
  8. }); 

当消息投递到一个不存在的交换机Exchange且ack=false时会输出日志:

  1. - Publishing message [(Body:'"hello"' MessageProperties [headers={spring_listener_return_correlation=a088eb3f-a234-4e15-bb7a-3aa9a6f043e6, spring_returned_message_correlation=29975bc1-f363-4e3a-85ca-010d13888720, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=7, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [DIRECT_EXCHANGE1], routingKey = [DIRECT_ROUTING_KEY2] 
  2.  
  3. - 消息发送到exchange失败,原因: channel error; protocol method: #methodclose>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE1' in vhost 'my_vhost', class-id=60, method-id=40) 

这里实现的比较简单你可以增加一些消息投递到交换机失败后的操作处理逻辑。

ReturnCallback

ReturnCallback接口用于实现消息已经成功发送到RabbitMQ交换机,但没有匹配到队列时的回调。

在Spring Boot中需要同时开启:

  1. spring: 
  2.   rabbitmq: 
  3.     publisher-returnstrue 
  4.     template: 
  5.       mandatory: true 

RabbitTemplate中的mandatory设置值优先级要高一些。

我们声明一个ReturnCallback并设置到RabbitTemplate中

  1. rabbitTemplate.setMandatory(true); 
  2. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { 
  3.     String correlationId = message.getMessageProperties() 
  4.             .getHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY); 
  5.     log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, 
  6.             replyCode, replyText, exchange, routingKey); 
  7. }); 

当消息成功投递到交换机但是无法匹配到队列时:

  1. - Publishing message [(Body:'"hello"' MessageProperties [headers={spring_listener_return_correlation=155648bd-fc3e-4c8b-a650-7b1ce720c7a6, spring_returned_message_correlation=7029ee49-357a-42fc-8532-dc41b4bb8e87, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=7, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [DIRECT_EXCHANGE], routingKey = [DIRECT_ROUTING_KEY2] 
  2.  
  3. - 消息:7029ee49-357a-42fc-8532-dc41b4bb8e87 发送失败, 应答码:312 原因:NO_ROUTE 交换机: DIRECT_EXCHANGE  路由键: DIRECT_ROUTING_KEY2 
  4. - 消息发送到exchange成功,id: 7029ee49-357a-42fc-8532-dc41b4bb8e87 

从上面我们也可以看出ReturnCallback只处理投递到队列失败的情况,并不像ConfirmCallback既能处理失败的情况也能处理成功的情况。

4. 总结

 

消息投递失败的处理在使用RabbitMQ的使用中时非常必要的,能够帮助我们追踪消息的投递情况,以及处理消息投递异常或者成功后的逻辑处理,为消息丢失进行一些兜底或者记录。但是请注意这个并不是发生在消费阶段,是否成功消费并不是由这两种回调来处理,我们有空再对消息的消费确认进行讲解。

本文转载自微信公众号「码农小胖哥」,可以通过以下二维码关注。转载本文请联系码农小胖哥公众号。

 

来源:码农小胖哥内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯