文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Redis中Stream类型怎么用

2023-06-25 14:37

关注

这篇文章主要介绍Redis中Stream类型怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

一、背景

最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。

二、redis中Stream类型的特点

三、Stream的结构

Redis中Stream类型怎么用

解释:

消费者组: Consumer Group,即使用 XGROUP CREATE 命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。

消费者: Consumer 消费消息。

last_delivered_id: 这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动一位,保证消费者不会读取到重复的消息。

pending_ids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。

消息内容:是一个键值对的格式。

Stream 中 消息的 ID: 默认情况下,ID使用 * ,redis可以自动生成一个,格式为 时间戳-序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。

四、Stream的命令

1、XADD 往Stream末尾添加消息

1、命令格式
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

Redis中Stream类型怎么用

2、举例

xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)

向流中增加一条数据,

127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中增加一个 username 是zhangsan的数据 *表示自动生成id"1635999858912-0" # 返回的是ID127.0.0.1:6379> keys *1) "stream-key" # 可以看到stream自动创建了127.0.0.1:6379>

向流中增加数据,不自动创建流

127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以加入失败(nil)127.0.0.1:6379> keys *(empty array)127.0.0.1:6379>

手动指定ID的值

127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是自己传递的1-1,而不是使用*自动生成"1-1" # 返回的是id的值127.0.0.1:6379>

设置一个固定大小的Stream1、精确指定Stream的大小

指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。

Redis中Stream类型怎么用

模糊指定Stream的大小

127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first"1636001034141-0"127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second"1636001044506-0"127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third"1636001057846-0"127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636001057846-0" 9) "groups"10) (integer) 011) "first-entry"12) 1) "1636001034141-0"    2) 1) "first"       2) "first"13) "last-entry"14) 1) "1636001057846-0"    2) 1) "third"       2) "third"127.0.0.1:6379>

~ 模糊指定流的大小,可以看到指定的是1,实际上已经到了3.

2、XRANGE查看Stream中的消息

1、命令格式
xrange key start end [COUNT count]

Redis中Stream类型怎么用

2、准备数据
127.0.0.1:6379> multiOK127.0.0.1:6379(TX)> xadd stream-key * username zhangsanQUEUED127.0.0.1:6379(TX)> xadd stream-key * username lisiQUEUED127.0.0.1:6379(TX)> exec1) "1636003481706-0"2) "1636003481706-1"127.0.0.1:6379> xadd stream-key * username wangwu"1636003499055-0"127.0.0.1:6379>

使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样

3、举例

获取所有的数据(-+的使用)

127.0.0.1:6379> xrange stream-key - +1) 1) "1636003481706-0"   2) 1) "username"      2) "zhangsan"2) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"3) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

-: 表示最小id的值

+:表示最大id的值

获取指定id范围内的数据,闭区间

127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-01) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"2) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

获取指定id范围内的数据,开区间

127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-01) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"127.0.0.1:6379>

(:表示开区间

获取某个毫秒后所有的数据

127.0.0.1:6379> xrange stream-key 1636003481706 +1) 1) "1636003481706-0"   2) 1) "username"      2) "zhangsan"2) 1) "1636003481706-1"   2) 1) "username"      2) "lisi"3) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

直接写毫秒不写后面的序列号即可。

获取单条数据

127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-01) 1) "1636003499055-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

startend的值写的一样即可获取单挑数据。

获取固定条数的数据

127.0.0.1:6379> xrange stream-key - + count 11) 1) "1636003481706-0"   2) 1) "username"      2) "zhangsan"127.0.0.1:6379>

使用 count进行限制

3、XREVRANGE反向查看Stream中的消息

XREVRANGE key end start [COUNT count]

使用方式和XRANGE类似,略。

4、XDEL删除消息

1、命令格式
xdel key ID [ID ...]
2、准备数据
127.0.0.1:6379> xadd stream-key * username zhangsan"1636004176924-0"127.0.0.1:6379> xadd stream-key * username lisi"1636004183638-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636004189211-0"127.0.0.1:6379>
3、举例

需求:往Stream中加入3条消息,然后删除第2条消息

127.0.0.1:6379> xdel stream-key 1636004183638-0(integer) 1 # 返回的是删除记录的数量127.0.0.1:6379> xrang stream -key - +127.0.0.1:6379> xrange stream-key - +1) 1) "1636004176924-0"   2) 1) "username"      2) "zhangsan"2) 1) "1636004189211-0"   2) 1) "username"      2) "wangwu"127.0.0.1:6379>

注意:

需要注意的是,我们从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。

5、XLEN查看Stream中元素的长度

1、命令格式
xlen key
2、举例

查看Stream中元素的长度

127.0.0.1:6379> xadd stream-key * username zhangsan"1636004690578-0"127.0.0.1:6379> xlen stream-key(integer) 1127.0.0.1:6379> xlen not-exists-stream-key(integer) 0127.0.0.1:6379>

注意:

如果xlen后方的key不存在则返回0,否则返回元素的个数。

6、XTRIM对Stream中的元素进行修剪

1、命令格式
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
2、准备数据
127.0.0.1:6379>  xadd stream-key * username zhangsan"1636009745401-0"127.0.0.1:6379> multiOK127.0.0.1:6379(TX)> xadd stream-key * username lisiQUEUED127.0.0.1:6379(TX)> xadd stream-key * username wangwuQUEUED127.0.0.1:6379(TX)> exec1) "1636009763955-0"2) "1636009763955-1"127.0.0.1:6379> xadd stream-key * username zhaoliu"1636009769625-0"127.0.0.1:6379>
3、举例

maxlen精确限制

127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2个消息(integer) 2127.0.0.1:6379> xrange stream-key - + # 可以看到之前加入的2个消息被删除了1) 1) "1636009763955-1"   2) 1) "username"      2) "wangwu"2) 1) "1636009769625-0"   2) 1) "username"      2) "zhaoliu"127.0.0.1:6379>

上方的意思是,保留stream-key这个Stream中最后的2个消息。

minid模糊限制

minid 是删除比这个id小的数据,本地测试的时候没有测试出来,略。

7、XREAD独立消费消息

XREAD只是读取消息,读取完之后并不会删除消息。 使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。

1、命令格式
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

Redis中Stream类型怎么用

2、准备数据
127.0.0.1:6379> xadd stream-key * username zhangsan"1636011801365-0"127.0.0.1:6379> xadd stream-key * username lisi"1636011806261-0"127.0.0.1:6379> xadd stream-key * username wangwu"1636011810905-0"127.0.0.1:6379>
3、举例

获取用户名是wangwu的数据

127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据需要是 > 1636011806261-01) 1) "stream-key"   2) 1) 1) "1636011810905-0"         2) 1) "username"            2) "wangwu"

获取2条数据

127.0.0.1:6379> xread count 2 streams stream-key 0-01) 1) "stream-key"   2) 1) 1) "1636011801365-0"         2) 1) "username"            2) "zhangsan"      2) 1) "1636011806261-0"         2) 1) "username"            2) "lisi"127.0.0.1:6379>

count限制单次读取最后的消息,因为当前读取可能没有这么多。

非阻塞读取Stream对尾的数据

即读取队列尾的下一个消息,在非阻塞模式下始终是nil

127.0.0.1:6379> xread streams stream-key $(nil)

阻塞读取Stream对尾的数据

Redis中Stream类型怎么用

注意:

8、消费者组相关操作

1、消费者组命令

Redis中Stream类型怎么用

2、准备数据

创建Stream的名称是 stream-key

创建2个消息,aa和bb

127.0.0.1:6379> xadd stream-key * aa aa"1636362619125-0"127.0.0.1:6379> xadd stream-key * bb bb"1636362623191-0"
3、创建消费者组

创建一个从头开始消费的消费者组

xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(表示从头开始消费)

创建一个从Stream最新的一个消息消费的消费者组

xgroup create stream-key g2 $

$表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。

4、创建一个从某个消息之后消费的消费者组
xgroup create stream-key g3 1636362619125-0  #1636362619125-0 这个是上方aa消息的id的值

1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。

从消费者中读取消息

127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取)1) 1) "stream-key"   2) 1) 1) "1636362619125-0"         2) 1) "aa"            2) "aa"      2) 1) "1636362623191-0"         2) 1) "bb"            2) "bb"127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >(nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。1) 1) "stream-key"   2) 1) 1) "1636362623191-0"         2) 1) "bb"            2) "bb"127.0.0.1:6379>

读取消费者的pending消息

127.0.0.1:6379> xgroup create stream-key g4 0-0OK127.0.0.1:6379> xinfo consumers stream-key g11) 1) "name"   2) "c1"   3) "pending"   4) (integer) 2   5) "idle"   6) (integer) 88792127.0.0.1:6379> xinfo consumers stream-key g4(empty array)127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-01) 1) "stream-key"   2) 1) 1) "1636362623191-0"         2) 1) "bb"            2) "bb"127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-01) 1) "stream-key"   2) (empty array)127.0.0.1:6379>

Redis中Stream类型怎么用

转移消费者的消息

127.0.0.1:6379> xpending stream-key g1 - + 10 c11) 1) "1636362619125-0"   2) "c1"   3) (integer) 2686183   4) (integer) 12) 1) "1636362623191-0"   2) "c1"   3) (integer) 102274   4) (integer) 7127.0.0.1:6379> xpending stream-key g1 - + 10 c2(empty array)127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-01) 1) "1636362623191-0"   2) 1) "bb"      2) "bb"127.0.0.1:6379> xpending stream-key g1 - + 10 c21) 1) "1636362623191-0"   2) "c2"   3) (integer) 17616   4) (integer) 8127.0.0.1:6379>

Redis中Stream类型怎么用

也可以通过xautoclaim来实现。

6、一些监控命令

查看消费组中消费者的pending消息

127.0.0.1:6379> xpending stream-key g1 - + 10 c21) 1) "1636362623191-0"   2) "c2"   3) (integer) 1247680   4) (integer) 8127.0.0.1:6379>

查看消费组中的消费者信息

127.0.0.1:6379> xinfo consumers stream-key g11) 1) "name"   2) "c1"   3) "pending"   4) (integer) 1   5) "idle"   6) (integer) 14748642) 1) "name"   2) "c2"   3) "pending"   4) (integer) 1   5) "idle"   6) (integer) 1290069127.0.0.1:6379>

查看消费组信息

127.0.0.1:6379> xinfo groups stream-key1) 1) "name"   2) "g1"   3) "consumers"   4) (integer) 2   5) "pending"   6) (integer) 2   7) "last-delivered-id"   8) "1636362623191-0"2) 1) "name"   2) "g2"   3) "consumers"......

查看Stream信息

127.0.0.1:6379> xinfo stream stream-key 1) "length" 2) (integer) 2 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1636362623191-0" 9) "groups"10) (integer) 411) "first-entry"12) 1) "1636362619125-0"    2) 1) "aa"       2) "aa"13) "last-entry"14) 1) "1636362623191-0"    2) 1) "bb"       2) "bb"127.0.0.1:6379>

以上是“Redis中Stream类型怎么用”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网行业资讯频道!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯