文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RabbitMq消息防丢失功能实现方式讲解

2023-01-28 06:06

关注

1.概述

1.1.数据丢失的原因

在消息中有三种可能性造成数据丢失:

消费者消费消息失败:

RabbitMq存在应答机制,默认为自动应答,MQ向消费者推送一条消息,消费者收到这条消息后会返回一个ack(应答)给MQ,MQ收到应答后会删除这条消息。

自动应答存在一个问题,就是消费者收到消息后立马就会给MQ返回ack,如果消费者返回完ack但还没来的及真正处理这条消息时,消费者断电宕机了,那么这条消息就丢失了。

这就是由于消费者消费消息失败造成的数据丢失。

生产者生产数据失败:

生产者向MQ推送了一条消息,但是由于由于诸如网络故障等原因mq并没有收到该条消息,这样就造成了这条消息的丢失。

MQ数据丢失:

MQ的数据是存在内存中的,诸如断电等原因可能会造成数据的丢失。

1.2.如何防止数据丢失

解决以上列举的数据丢失问题的办法有三种:

手动应答:

RabbitMQ默认是自动应答,消费者收到消息后就会自动返回ack给MQ,可以将应答模式改为手动应答,在消费者一侧消息的消费动作完成后手动来返回ack给MQ,用来解决“消费者消费消息失败”问题。

消息确认机制:

当消息队列收到消息后,告知生产者,让生产者感知到自己生产的消息,消息队列已经接收到,用来解决“生产者生产消息失败”问题。消息确认机制有两种实现方式:

持久化:

消息队列的消息持久化到磁盘上,用来解决“MQ数据丢失”问题。

2.手动应答

手动应答是通过设置channel来实现的,以下为一个完整代码示例。

配置类:

@Configuration
public class config {
    @Bean
    public Queue queue(){
        return new Queue("queue_01",false);
    }
}

生产者:

@SpringBootTest(classes = Main.class)
public class Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void producerMsg(){
        rabbitTemplate.convertAndSend("queue_01","hello_world");
    }
}

消费者:

@Component
@Slf4j
public class Consumer {
    @RabbitListener(queues = {"queue_01"})
    public void consumerMsg(String msg, Message message,Channel channel){
        try {
            log.info("消费者消费消息: "+msg);
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (Exception ex) {
                log.error(ex.getMessage());
            }
        }
    }
}

3.消息确认机制

AQMP事务、confirm其实都是基于channel的。

3.1.AMQP事务

AMQP事务和数据库事务类似,定义一组对MQ的操作,统一提交,成功则全部一起执行,失败则全部回滚。AMQP事务在spring boot中的使用很简单,和数据库的事务一样,一个注解就可以搞定。

@GetMapping("/direct/wx/transactional")
@Transactional(rollbackFor = Exception.class)
public String sendDirectMessageTransactional() {
  rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!");
  log.info("开启事务消息机制");
    try {
           Thread.sleep(5000);
       } catch (Exception e) {
            e.printStackTrace();
       }
      return "ok";
}

3.2.confirm

confirm是基于channel的,一旦channel进入confirm模式,所有在该channel上发布的消息都会被指派一个唯一的ID(从1开始),消息被投递道匹配队列后broker会发送一个确认消息给生产者。如果消息和队列是可持久化的(durable为true),那么确认消息会在消息被写入磁盘后发出。

confirm最大的好处在于异步,生产者在等待上一条消息的确认消息的时候可以继续往下发送。

confirm在spring boot中的使用很简单,在配置文件中开启即可,并且支持自定义回调函数:

配置文件:

spring.rabbitmq.publisher-confirms: true

spring.rabbitmq.publisher-returns: true

生产者:

@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 设置交换机处理失败消息的模式     true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
        // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
        rabbitTemplate.setMandatory(true);
        //消息消费者确认收到消息后,手动ack回执
        rabbitTemplate.setConfirmCallback(this);
        // 暂时关闭 return 配置
        //rabbitTemplate.setReturnCallback(this);
        //发送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交换机接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 没有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
    }
}

到此这篇关于RabbitMq消息防丢失功能实现方式讲解的文章就介绍到这了,更多相关RabbitMq消息防丢失内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯