应用场景
在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。
引入依赖
<!-- RocketMq Spring Boot Starter-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
消费者代码
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费到的数据为:"+s);
}
}
问题排查
RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。
@Override
public void afterSingletonsInstantiated() {
// 获取所有所有使用了RocketMQMessageListener注解的bean
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(beans)) {
// 循环注册容器
beans.forEach(this::registerContainer);
}
}
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
// 校验当前bean是否实现了RocketMQListener接口
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
}
// 获取bean上的annotation
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
// 解析group及topic,可支持表达式
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
// 注册bean的,调用createRocketMQListenerContainer
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String nameServer = environment.resolvePlaceholders(annotation.nameServer());
nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
// 此处已经根据表达式将数据取出
String tags = environment.resolvePlaceholders(annotation.selectorExpression());
if (!StringUtils.isEmpty(tags)) {
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
// 此处将SelectorExpression的数据覆盖成了表达式
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name); // REVIEW ME, use the same clientId or multiple?
return container;
}
问题解决
因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private StandardEnvironment environment;
@Override
public void afterPropertiesSet() throws Exception {
Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
for (Object bean : beans.values()){
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
continue;
}
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
Field field = invocationHandler.getClass().getDeclaredField("memberValues");
field.setAccessible(true);
Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
if(Objects.nonNull(entry)){
memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
}
}
}
}
}
初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。