别再用 Redis List 实现消息队列了,Stream 专为队列而生
使用 Redis 的 List 实现消息队列有很多局限性,比如:
没有良好的 ACK 机制;
没有 ConsumerGroup 消费组概念;
消息堆积。
List 是线性结构,想要查询指定数据需要遍历整个列表;
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
它实现了大部分消息队列的功能:
消息 ID 系列化生成;
消息遍历;
消息的阻塞和非阻塞读;
Consumer Groups 消费组;
ACK 确认机制。
支持多播。
提供了很多消息队列操作命令,并且借鉴 Kafka 的 Consumer Groups 的概念,提供了消费组功能。
同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。
废话少说,先来看下如何使用,官网文档详见:https://redis.io/topics/streams-intro
XADD:插入消息
XADD 云岚宗 * task kill name 萧炎
"1645936602161-0"
XADD streamName id field value [field value ...]
当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。
XREAD:读取消息
XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
2) 1) 1) "1645936602161-0"
2) 1) "task"
2) "kill"
3) "name"
4) "萧炎" # 萧炎
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
COUNT:表示每个流中最多读取的元素个数; BLOCK:阻塞读取,当消息队列没有消息的时候,则阻塞等待, 0 表示无限等待,单位是毫秒。 ID:消息 ID,在读取消息的时候可以指定 ID,并从这个 ID 的下一条消息开始读取,0-0 则表示从第一个元素开始读取。
XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 $
XREAD COUNT 2 BLOCK 0 STREAMS 云岚宗 0-0
指令的时候又会重新读取到。ConsumerGroup
Redis Stream 的结构如上图所示。有一个消息链表,每个消息都有一个唯一的 ID 和对应的内容; 消息持久化; 每个消费组的状态是独立的,不不影响,同一份的 Stream 消息会被所有的消费组消费; 一个消费组可以由多个消费者组成,消费者之间是竞争关系,任意一个消费者读取了消息都会使 last_deliverd_id 往前移动; 每个消费者有一个 pending_ids 变量,用于记录当前消费者读取了但是还没 ack 的消息。它用来保证消息至少被客户端消费了一次。
XGROUP用于创建、销毁和管理消费者组。 XREADGROUP通过消费组从流中读取数据。 XACK是允许消费者将待处理消息标记为已正确处理的命令。
创建消费组
XGROUP CREATE
指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id
变量。XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40
# 语法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龙门 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇门 0-0 MKSTREAM
stream:指定队列的名字; group:指定消费组名字; start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息, 0-0
从第一条开始读取,$
表示从最后一条向后开始读取,只接收新消息。MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选 MKSTREAM
子命令作为 之后的最后一个参数来自动创建流。
读取消息
consumer1
从bossStream
阻塞读取一条消息:XREADGROUP GROUP 青龙门 consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957821396-0"
2) 1) "name"
2) "zhangsan"
3) "age"
4) "26"
XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]
XREAD
大同小异,区别在于新增 GROUP groupName consumerName
选项。>
:命令的最后参数>
,表示从尚未被消费的消息开始读取;BLOCK:阻塞读取;
consumer2
执行读取操作:XREADGROUP GROUP 青龙门 consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957838700-0"
2) 1) "name"
2) "lisi"
3) "age"
4) "2"
consumer2
不能再读取到 zhangsan
了,而是读取下一条 lisi
因为这条消息已经被 consumer1
读取了。XPENDING 查看已读未确认消息
XREADGROUP GROUP groupName consumerName
读取消息,但是没有给 Stream 发送 XACK
命令,消息依然保留。bossStream
中的 消费组「青龙门」中各个消费者已读取未确认的消息信息:XPENDING bossStream 青龙门
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
1)
未确认消息条数;2) ~ 3)
青龙门中所有消费者读取的消息最小和最大 ID;
consumer1
读取了哪些数据,使用以下命令:XPENDING bossStream 青龙门 - + 10 consumer1
1) 1) "1645957821396-0"
2) "consumer1"
3) (integer) 3758384
4) (integer) 1
ACK 确认
XACK bossStream 青龙门 1645957821396-0 1645957838700-0
(integer) 2
XACK key group-key ID [ID ...]
使用 Redisson 实战
<dependency>
<groupId>org.redissongroupId>
<artifactId>redisson-spring-boot-starterartifactId>
<version>3.16.7version>
dependency>
spring:
application:
name: redission
redis:
host: 127.0.0.1
port: 6379
ssl: false
@Slf4j
@Service
public class QueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 发送消息到队列
*
* @param message
*/
public void sendMessage(String message) {
RStreamstream = redissonClient.getStream("sensor#4921");
stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");
}
/**
* 消费者消费消息
*
* @param message
*/
public void consumerMessage(String message) {
RStreamstream = redissonClient.getStream("sensor#4921");
stream.createGroup("sensors_data", StreamMessageId.ALL);
Map> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry> entry : messages.entrySet()) {
Mapmsg = entry.getValue();
System.out.println(msg);
stream.ack("sensors_data", entry.getKey());
}
}
}
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️