文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

使用Redis怎么实现延迟队列

2023-06-15 02:05

关注

本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

方案一:

采用通过定时任务采用数据库/非关系型数据库轮询方案。

优点:

实现简单,对于项目前期这样是最容易的解决方案。

缺点:

DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询。

服务资源浪费,因为轮询需要对所有的数据做一次 SCAN 扫描 JOB 服务的资源开销很大。

方案二:

采用延迟队列:

优点:

服务的资源使用率较高,能够精确的实现超时任务的执行。

减少 DB 的查询次数,能够降低数据库的压力

缺点:

对于延迟队列来说本身设计比较复杂,目前没有通用的比较好过的方案。

基于 Redis 的延迟队列实现

基于以上的分析,我决定通过 Redis 来实现分布式队列。

设计思路:

使用Redis怎么实现延迟队列

第一步将需要发放的消息发送到延迟队列中。

延迟队列将数据存入 Redis 的 ZSet 有序集合中score 为当前时间戳,member 存入需要发送的数据。

添加一个 schedule 来进行对 Redis 有序队列的轮询。

如果到达达到消息的执行时间,那么就进行业务的执行。

如果没有达到消息的执行是将,那么消息等待下轮执行。

实现步骤:

由于本处篇幅有限,所以只列举部分代码,完整的代码可以在本文最后访问 GitHub 获取。由于本人阅历/水平有限,如有建议/或更正欢迎留言或提问。先在此谢谢大家驻足阅读 ? ? ?。

需要注意的问题:

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。

事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

我们可以通过 Redis 的 eval 命令来执行 lua 脚本来保证原子性实现Redis的事务。

实现步骤如下:

延迟队列接口

public interface RedisDelayQueue<E extends DelayMessage> {    String META_TOPIC_WAIT = "delay:meta:topic:wait";    String META_TOPIC_ACTIVE = "delay:meta:topic:active";    String TOPIC_ACTIVE = "delay:active:9999";        void poll();        void push(E e);}

延迟队列消息

@Setter@Getterpublic class DelayMessage {        private String id;        private String topic = "default";        private String body;        private Long delayTime = System.currentTimeMillis() + 30000L;        private LocalDateTime createTime;}

延迟队列实现

@Componentpublic class RedisDelayQueueImpl<E extends DelayMessage> implements RedisDelayQueue<E> {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private StringRedisTemplate redisTemplate;    @Override    public void poll() {        // todo    }        @SneakyThrows    @Override    public void push(E e) {        try {            String jsonStr = JSON.toJSONString(e);            String topic = e.getTopic();            String zkey = String.format("delay:wait:%s", topic);            String u =                    "redis.call('sadd', KEYS[1], ARGV[1])\n" +                            "redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" +                            "return 1";            Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)};            Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)};            Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {                Object nativeConnection = connection.getNativeConnection();                if (nativeConnection instanceof RedisAsyncCommands) {                    RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);                } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {                    RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);                }                return 0L;            });            logger.info("延迟队列[1],消息推送成功进入等待队列({}), topic: {}", result != null && result > 0, e.getTopic());        } catch (Throwable t) {            t.printStackTrace();        }    }    private byte[] serialize(String key) {        RedisSerializer<String> stringRedisSerializer =                (RedisSerializer<String>) redisTemplate.getKeySerializer();        //lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析        return stringRedisSerializer.serialize(key);    }}

定时任务

@Componentpublic class DistributeTask {    private static final String LUA_SCRIPT;    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private StringRedisTemplate redisTemplate;    static {        StringBuilder sb = new StringBuilder(128);        sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n");        sb.append("if(next(val) ~= nil) then\n");        sb.append("    redis.call('sadd', KEYS[2], ARGV[2])\n");        sb.append("    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n");        sb.append("    for i = 1, #val, 100 do\n");        sb.append("        redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n");        sb.append("    end\n");        sb.append("    return 1\n");        sb.append("end\n");        sb.append("return 0");        LUA_SCRIPT = sb.toString();    }        @Scheduled(cron = "0/5 * * * * ?")    public void scheduledTaskByCorn() {        try {            Set<String> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT);            assert members != null;            for (String k : members) {                if (!redisTemplate.hasKey(k)) {                    // 如果 KEY 不存在元数据中删除                    redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k);                    continue;                }                String lk = k.replace("delay:wait", "delay:active");                Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)};                Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)};                Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {                    Object nativeConnection = connection.getNativeConnection();                    if (nativeConnection instanceof RedisAsyncCommands) {                        RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);                    } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {                        RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);                    }                    return 0L;                });                logger.info("延迟队列[2],消息到期进入执行队列({}): {}", result != null && result > 0, TOPIC_ACTIVE);            }        } catch (Throwable t) {            t.printStackTrace();        }    }    private byte[] serialize(String key) {        RedisSerializer<String> stringRedisSerializer =                (RedisSerializer<String>) redisTemplate.getKeySerializer();        //lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析        return stringRedisSerializer.serialize(key);    }}

以上就是使用Redis怎么实现延迟队列,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯