文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ整合SpringBoot实现生产级二次封装

2024-04-02 19:55

关注

前言说明

前置掌握:SpringBoot基础使用、RocketMQ和SpringBoot的整合使用

文章难度:四颗星

代码不难,重点是封装的思想需要体会

不同观点欢迎大家在评论区一起讨论学习,没有对错之分,每个系统业务特性不同,适合系统的才是最好的~

源码地址:https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只会说明核心代码,其他的基础整合配置和多环境自动隔离参考源码即可

一、为什么要二次封装

为了不产生歧义,文章中提到的二次封装均是基于原始使用方式的封装,而非源码级别的二次封装

换句话说:如果都需要对源码进行封装了,那么说明公司业务规模都到一定程度了,二次封装这种东西已经不需要讨论了,封装已经是一个共识

1.1 二次封装不同观点

让我们以一个生活中的蛋炒饭开个头

原始框架好比提供了原材料:厨具、鸡蛋,米饭等食材、菜谱

问题:哪种方案更好? 答案:两种各有各的优势(在说废话,哈哈~)

1.2 封装的抽离点

1.3 设计模式的应用

二、二次封装核心要点

2.1 二次封装核心点

2.1.1 封装主要讨论点

2.1.2 发送/消费的几种消息实体

2.2 RocketMQTemplate封装

2.2.1 封装基础实体类

package com.codecoord.rocketmq.domain;

import lombok.Data;

import java.time.LocalDateTime;
import java.util.UUID;


@Data
public abstract class BaseMqMessage {
    
    protected String key;
    
    protected String source = "";
    
    protected LocalDateTime sendTime = LocalDateTime.now();
    
    protected String traceId = UUID.randomUUID().toString();
    
    protected Integer retryTimes = 0;
}

2.2.2 RocketMQTemplate

package com.codecoord.rocketmq.template;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


@Component
public class RocketMqTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate template;

    
    public RocketMQTemplate getTemplate() {
        return template;
    }

    
    public String buildDestination(String topic, String tag) {
        return topic + RocketMqSysConstant.DELIMITER + tag;
    }

    
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        LOGGER.info("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
        return sendResult;
    }
}

3.2.3 增强RocketMQTemplate

package com.codecoord.rocketmq.template;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;


@Component
public class OrderMessageTemplate extends RocketMqTemplate {
    /// 如果不采用继承也可以直接注入使用
    

    
    public SendResult sendOrderPaid(@NotNull String orderId, String body) {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        message.setKey(orderId);
        message.setSource("订单支付");
        message.setMessage(body);
        // 这两个字段只是为了测试
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
    }
}

2.3 RocketMQListener封装

package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;


public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
    
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RocketMqTemplate rocketMqTemplate;

    
    protected abstract String consumerName();

    
    protected abstract void handleMessage(T message) throws Exception;

    
    protected abstract void overMaxRetryTimesMessage(T message);
    
    protected boolean isFilter(T message) {
        return false;
    }

    
    protected abstract boolean isRetry();

    
    protected abstract boolean isThrowException();

    
    protected int maxRetryTimes() {
        return 10;
    }

    
    protected int retryDelayLevel() {
        return -1;
    }

    
    public void dispatchMessage(T message) {
        MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
        // 基础日志记录被父类处理了
        logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
        if (isFilter(message)) {
            logger.info("消息不满足消费条件,已过滤");
            return;
        }
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > maxRetryTimes()) {
            overMaxRetryTimesMessage(message);
            return;
        }
        try {
            long start = Instant.now().toEpochMilli();
            handleMessage(message);
            long end = Instant.now().toEpochMilli();
            logger.info("消息消费成功,耗时[{}ms]", (end - start));
        } catch (Exception e) {
            logger.error("消息消费异常", e);
            // 是捕获异常还是抛出,由子类决定
            if (isThrowException()) {
                throw new RuntimeException(e);
            }
            if (isRetry()) {
                // 获取子类RocketMQMessageListener注解拿到topic和tag
                RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
                if (Objects.nonNull(annotation)) {
                    message.setSource(message.getSource() + "消息重试");
                    message.setRetryTimes(message.getRetryTimes() + 1);
                    SendResult sendResult;
                    try {
                        // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
                        // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
                        sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    // 发送失败的处理就是不进行ACK,由RocketMQ重试
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        throw new RuntimeException("重试消息发送失败");
                    }
                }
            }
        }
    }
}
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG,
        // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
        consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
                                         implements RocketMQListener<RocketMqEntityMessage> {
    
    @Override
    protected String consumerName() {
        return "RocketMQ二次封装消息消费者";
    }

    @Override
    public void onMessage(RocketMqEntityMessage message) {
        // 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型
        super.dispatchMessage(message);
    }

    @Override
    protected void handleMessage(RocketMqEntityMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        System.out.println("业务消息处理");
    }

    @Override
    protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
    }

    @Override
    protected boolean isRetry() {
        return false;
    }

    @Override
    protected int maxRetryTimes() {
        // 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用
        return 5;
    }

    @Override
    protected boolean isThrowException() {
        // 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常
        return false;
    }
}

2.4 广播消息的应用场景

package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
        // messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {

    
    @Override
    public void onMessage(MessageExt message) {
        log.info("收到广播消息【{}】", new String(message.getBody()));
    }
}

2.3 代码封装完结测试

封装测试大家可以直接参考RocketMqController即可

package com.codecoord.rocketmq.controller;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;


@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
    
    @Resource
    private RocketMqTemplate rocketMqTemplate;
    
    @Resource
    private OrderMessageTemplate orderMessageTemplate;

    
    @RequestMapping("/entity/message")
    public Object sendMessage() {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        // 设置业务key
        message.setKey(UUID.randomUUID().toString());
        // 设置消息来源,便于查询we年
        message.setSource("封装测试");
        // 业务消息内容
        message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
        // Java时间字段需要单独处理,否则会序列化失败
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
    }

    
    @RequestMapping("/order/paid")
    public Object sendOrderPaidMessage() {
        return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客户下单了...,快快备货");
    }

    
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        // 生产中不推荐使用jsonObject传递,不看发送者无法知道传递的消息包含什么信息
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
        // 如果要走内部方法发送则必须要按照标准来,否则就使用原生的消息发送
        return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
    }
}

到此这篇关于RocketMQ整合SpringBoot实现生产级二次封装的文章就介绍到这了,更多相关RocketMQ SpringBoot封装内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯