文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java实现Redis延时消息队列

2024-04-02 19:55

关注

什么是延时任务

延时任务,顾名思义,就是延迟一段时间后才执行的任务。举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了。只要在前一天下班前指定第二天要发送资讯的时间,到了第二天指定的时间点资讯就能准时发出去了。如果大家有运营过公众号,就会知道公众号后台也有文章定时发送的功能。总而言之,延时任务的使用还是很广泛的。

延时任务的特点

实现思路:

将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
使用ZSET做优先队列,按照score维持优先级(用当前时间+需要延时的时间作为score)
轮询ZSET,拿出score比当前时间戳大的数据(已过期的)
根据id拿到消息池的具体消息进行消费
消费成功,删除改队列和消息
消费失败,让该消息重新回到队列

代码实现

1.消息模型


import lombok.Data;
import lombok.experimental.Accessors;

import javax.validation.constraints.NotNull;
import java.io.Serializable;


@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {

    
    private String group;

    
    private String id;

    
    @NotNull(message = "消息延时时间不能为空")
    private long delay;

    
    @NotNull(message = "消息存活时间不能为空")
    private int ttl;
    
    private Object body;
    
    private long createTime;
}

2.RedisMq 消息队列实现类


package com.shixun.base.redisMq;

import com.shixun.base.jedis.service.RedisService;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


@Component
public class RedisMq {

    
    public static final String MSG_POOL = "Message:Pool:";

    
    public static final String QUEUE_NAME = "Message:Queue:";

//    private static final int SEMIH = 30 * 60;


    @Resource
    private RedisService redisService;

    
    public boolean addMsgPool(RedisMessage message) {
        if (null != message) {
            redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());
            return true;
        }
        return false;
    }

    
    public void deMsgPool(String group, String id) {
        redisService.remove(MSG_POOL + group + id);
    }

    
    public void enMessage(String key, long score, String val) {
        redisService.zsset(key, val, score);
    }

    
    public boolean deMessage(String key, String id) {
        return redisService.zdel(key, id);
    }
}

3.消息生产者


import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;


@Component
public class MessageProvider {

    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

    @Resource
    private RedisMq redisMq;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    public boolean sendMessage(@Validated RedisMessage message) {
        Assert.notNull(message);
        //The priority is if there is no creation time
//        message.setCreateTime(System.currentTimeMillis());
        message.setId(IdUtil.fastUUID());
        Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        try {
            redisMq.addMsgPool(message);
            redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());
            logger.info("RedisMq发送消费信息{},当前时间:{},消费时间预计{}",message.toString(),new Date(),sdf.format(delayTime));
        }catch (Exception e){
            e.printStackTrace();
            logger.error("RedisMq 消息发送失败,当前时间:{}",new Date());
            return false;
        }
        return true;
    }
}

4.消息消费者



@Component
public class RedisMqConsumer {

    private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);

    @Resource
    private RedisMq redisMq;

    @Resource
    private RedisService redisService;

    @Resource
    private MessageProvider provider;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //@Scheduled(cron = "*/1 * * * * ? ")
    
    public void baseMonitor(RedisMqExecute mqExecute){
        String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();
        //The query is currently expired
        Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());
        if (null != set) {
            long current = System.currentTimeMillis();
            for (Object id : set) {
                long  score = redisService.getScore(queueName, id.toString()).longValue();
                //Once again the guarantee has expired , And then perform the consumption
                if (current >= score) {
                    String str = "";
                    RedisMessage message = null;
                    String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();
                    try {
                        message = (RedisMessage)redisService.get(msgPool + id.toString());
                        log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));
                        if(null==message){
                            return;
                        }
                        //Do something ; You can add a judgment here and if it fails you can add it to the queue again
                        mqExecute.execute(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //If an exception occurs, it is put back into the queue
                        // todo:  If repeated, this can lead to repeated cycles
                        log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());
                        provider.sendMessage(message);
                    } finally {
                        redisMq.deMessage(queueName, id.toString());
                        redisMq.deMsgPool(message.getGroup(),id.toString());
                    }
                }
            }
        }
    }
}

5. 消息执接口





public interface RedisMqExecute {

    
    public String getQueueName();


    
    public boolean execute(RedisMessage message);


    

    public void   threadPolling();

}

6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 



@Service
public class OrderMqExecuteImpl implements RedisMqExecute {


    private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);

    public final static String name = "orderPoll:";

    @Resource
    private RedisMqConsumer redisMqConsumer;

    private RedisMqExecute mqExecute = this;

    @Resource
    private OrderService orderService;


    @Override
    public String getQueueName() {
        return name;
    }

    @Override
    
    public boolean execute(RedisMessage message) {
        logger.info("Do orderMqPoll ; Time:{}",new Date());
  //Do 
        return true;
    }

    @Override
    
    public void threadPolling() {
        ThreadUtil.execute(() -> {
            while (true) {
                redisMqConsumer.baseMonitor(mqExecute);
                ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);
            }
        });
    }
}

使用事例
 1. 实现RedisMqExecute 接口 创建对应的轮询或者采取定时器的方式执行 和实现具体的任务。
 2.  通过MessageProvider 实现相对应的消息服务和绑定队列组,通过队列组的方式执行。
 3. 提示: 采取线程的方式需要在项目启动过程中执行,采取定时器或者调度的方式可以更加动态的调整。

到此这篇关于Java实现Redis延时消息队列的文章就介绍到这了,更多相关Java Redis延时消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯