用 Redis 做一个可靠的延迟队列
程序员的成长之路
共 10643字,需浏览 22分钟
· 2022-08-04
阅读本文大概需要 7 分钟。
来自:cnblogs.com/Finley/p/16400287.html
当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存? 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?
DelayQueue
等方式实现延时任务。我们在之前的文章中讨论过他们的缺陷:比如使用 Redis 过期通知不保证准时、发送即忘不保证送达,时间轮缺乏持久化机制容易丢失等。持久化: 服务重启或崩溃不能丢失任务 确认重试机制: 任务处理失败或超时应该有重试 定时尽量精确
member
投递时间戳作为 score
,使用 zrangebyscore
命令搜索已到投递时间的消息然后将其发给消费者。提供 ACK 和重试机制 只需要 Redis 和消费者即可运行,无需其它组件 提供 At-Least-One 投递语义、并保证消息不会并发消费
https://github.com/HDT3213/delayqueue
go get github.com/hdt3213/delayqueue
完成安装。start()
即可:package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注册处理消息的回调函数
// 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
return true
})
// 发送延时消息
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// 启动消费协程
done := queue.StartConsume()
// 阻塞等待消费协程退出
<-done
}
原理详解
msgKey
: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免pendingKey
: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。readyKey
: 列表类型,需要投递的消息 ID。unAckKey
: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。retryKey
: 列表类型,已到重试时间的消息 IDgarbageKey
: 集合类型,用于暂存已达重试上线的消息 IDretryCountKey
: 哈希表类型,键为消息 ID, 值为剩余的重试次数
都是原子性的 不会重复处理同一条消息 操作前后消息队列始终处于正确的状态
pending2ReadyScript
pending2ReadyScript
使用 zrangebyscore
扫描已到投递时间的消息ID并把它们移动到 ready
中:-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中找出已到投递时间的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中删除已投递的消息
ready2UnackScript
ready2UnackScript
从 ready
或者 retry
中取出一条消息发送给消费者并放入 unack
中,类似于 RPopLPush
:-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
unack2RetryScript
unack2RetryScript
从 retry
中找出所有已到重试时间的消息并把它们移动到 unack
中:-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重试时间的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- 剩余次数大于 0
redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数
redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
else -- 剩余重试次数为 0
redis.call("HDel", KEYS[2], k) -- 删除重试次数记录
redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 将已处理的消息从 unack key 中删除
KEYS
参数中声明自己要访问的 key, 而我们将每个 msg
有一个独立的 key
,我们在执行 unack2RetryScript
之前是不知道哪些 msg key
需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key
中,脚本执行完后再通过 del 命令将他们删除:func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
}
ack
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}
unack key
中消息的重试时间改为现在,随后执行的 unack2RetryScript
会立即将它移动到 retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}
consume
consume
函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer
来消费消息:func (q *DelayQueue) consume() error {
// 执行 pending2ready,将已到时间的消息转移到 ready
err := q.pending2Ready()
if err != nil {
return err
}
// 循环调用 ready2Unack 拉取消息进行消费
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// 将 nack 或超时的消息放入重试队列
err = q.unack2Retry()
if err != nil {
return err
}
// 清理已达到最大重试次数的消息
err = q.garbageCollect()
if err != nil {
return err
}
// 消费重试队列
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
}
推荐阅读:
替代ELK:ClickHouse+Kafka+FlieBeat
互联网初中高级大厂面试题(9个G) 内容包含Java基础、JavaWeb、MySQL性能优化、JVM、锁、百万并发、消息队列、高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper......等技术栈!
⬇戳阅读原文领取! 朕已阅
评论
真高!比亚迪员工爆料比亚迪在越南的薪资水平:基本工资480万,全勤奖35万,交通补助20万,餐补110万,每周6天,每天10小时
上一篇:某大公司为逼迫员工离职,竟然把他的工位安排到厕所旁,没想到他直接开始记录领导的如厕时间,还发到公司大群...对此,你怎么看?--完--PS:欢迎在留言区留下你的观点,一起讨论提高。如果今天的文章让你有新的启发,欢迎转发分享给更多人。全文完,感谢你的耐心阅读。如果你还想看到我的文章,请一定给本
开发者全社区
0
某大公司为逼迫员工离职,竟然把他的工位安排到厕所旁,没想到他直接开始记录领导的如厕时间,还发到公司大群...
上一篇:字节的跳动职级与薪资(2024年)我们与公司间的合作,宛如两艘船只在茫茫大海上相互依靠,共同抵御风浪,携手驶向成功的彼岸。然而,当航向开始产生分歧,或是波涛汹涌的风浪改变了我们的初衷,我们或许应当冷静地选择和平分手,而非在风雨中硬撑。最近,一位网友的遭遇引起了广大职场人的关注和热议。这位网友
开发者全社区
0
金融研究 | 使用Python测量关键审计事项的「信息含量」
Tips: 公众号推送后内容只能更改一次,且只能改20字符。如果内容出问题,或者想更新内容, 只能重复推送。为了更好的阅读体验,建议阅读本文博客版, 链接地址https://textdata.cn/blog/2023-01-13-information-content-of-critical-aud
大邓和他的Python
0
我看阿里的年终奖总算发了!
到4月底了,这两天看朋友圈,发现阿里的年终奖终于发了,问了问老同学,也从网上检索了不少信息,基本搞清楚了阿里今年的年终奖情况。近来来阿里一些集团对绩效等级做了较大的调整,以前的旧绩效系统中,绩效分为3.25、3.5、3.75、4和5五个等级,其中4和5是较高绩效等级,较少见。而且之前3.5绩效内部划
公子龙
0
CVPR 2024|大视觉模型的开山之作!无需任何语言数据即可打造大视觉模型
↑ 点击蓝字 关注极市平台作者丨科技猛兽编辑丨极市平台极市导读 本文提出一种序列建模 (sequential modeling) 的方法,不使用任何语言数据,训练大视觉模型。>>加入极市CV技术交流群,走在计算机视觉的最前沿本文目录1 序列建模打造大视觉模型(来自 U
极市平台
1
金融研究(更新) | 使用Python构建关键审计事项的「信息含量」
Tips: 公众号推送后内容只能更改一次,且只能改20字符。如果内容出问题,或者想更新内容, 只能重复推送。为了更好的阅读体验,建议阅读本文博客版, 链接地址https://textdata.cn/blog/2023-01-13-information-content-of-critical-aud
大邓和他的Python
0
字节的跳动职级与薪资(2024年)
上一篇:阿里公布年终奖,P7, 3.5+,22W年终奖,还有35W长期现金激励,真香字节跳动自2012年3月成立以来,已经迅速成长为一个全球性的科技公司。其产品和服务已经遍布全球150多个国家与地区,并且支持超过75种不同的语言。在字节跳动的官方网站上,列出了一系列引人注目的产品和服务,包括但不限于
开发者全社区
0
盘点Lombok的几个骚操作,你绝对没用过!
👉 欢迎加入小哈的星球 ,你将获得: 专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡 / 赠书福利全栈前后端分离博客项目 2.0 版本完结啦, 演示链接:http://116.62.199.48/ ,新项目正在酝酿中
小哈学Java
0