leftPush消息入队,rightPop对应,消息出队。
rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS)阻塞出队,0表示永久阻塞
生产消息服务
@Service
public class RedisService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public Object publish() {
OrderDTO dto = new OrderDTO();
dto.setId(1);
dto.setCreateTime(new Date());
dto.setMoney("12.34");
dto.setOrderNo("orderNo1");
String s = JSON.toJSONString(dto);
ListOperations<String, String> listOperations = redisTemplate.opsForList();
//leftPush和rightPop对应,左边入队,右边出队
listOperations.leftPush(RedisConstant.MQ_LIST, s);
//因为出队是阻塞读取的,所以上一步入队后,数据立刻就被驱走了,下一步size=0
Long size = listOperations.size(RedisConstant.MQ_LIST);
List<String> list = new ArrayList<>();
if (size != null && size > 0) {
list = listOperations.range(RedisConstant.MQ_LIST, 0, size - 1);
}
return list;
}
}
测试
@RestController
@RequestMapping("redisList")
public class RedisListController {
@Autowired
private RedisService redisService;
@GetMapping("publish")
public Object publish() {
return redisService.publish();
}
}
消费消息服务,定时任务
@Component
public class RedisConsumeTask {
@Autowired
private RedisService redisService;
@TaskLock(RedisConstant.CONSUME_REDIS_LIST)
@Scheduled(cron = "0/10 * * * * ?")
public void consumeMqList() {
redisService.consumeMqList();
}
}
@Service
@Slf4j
public class RedisService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void consumeMqList() {
ListOperations<String, String> listOperations = redisTemplate.opsForList();
//0时间,表示阻塞永久
//待机一小时后,再次发消息,消费不了了,阻塞有问题啊。还得轮寻啊
//String s = listOperations.rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS);
String s = listOperations.rightPop(RedisConstant.MQ_LIST);
if (s == null) {
return;
}
log.info("{} = {}", RedisConstant.MQ_LIST, s);
OrderDTO dto = JSON.parseObject(s, OrderDTO.class);
log.info("dto = {}", dto);
}
}
日志
@Component
@Aspect
public class TaskLockAop {
@Autowired
private RedisLockRegistry redisLockRegistry;
@Around("execution(@TaskLock * * (..))")
public Object taskAround(ProceedingJoinPoint pjp) throws Throwable {
TaskLock taskAnnotation = ((MethodSignature)pjp.getSignature()).getMethod().getAnnotation(TaskLock.class);
String lockKey = taskAnnotation.value();
Lock lock = redisLockRegistry.obtain(lockKey);
try {
lock.tryLock(30L, TimeUnit.SECONDS);
System.out.println("任务开始, " + lockKey + ", " + new Date());
return pjp.proceed();
} finally {
lock.unlock();
System.out.println("任务结束, " + lockKey + ", " + new Date());
}
}
}
测试
http://localhost:9040/redisList/publish
["{“createTime”:1574394538430,“id”:1,“money”:“12.34”,“orderNo”:“orderNo1”}"]
下面一直阻塞,任务开始了,不收到消息,永远不会结束。
阻塞有问题,改用轮询了。
先启动发送消息服务,发送消息。后启动消费消息服务,可以消费消息。这一点,比发布订阅要稳定。
关联项目https://github.com/mingwulipo/cloud-demo.git
到此这篇关于redis用list做消息队列的实现示例的文章就介绍到这了,更多相关redis list消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!