【GoCN酷Go推荐】异步任务队列Asynq
什么是Asynq
Asynq
是一个go
语言实现的分布式任务队列和异步处理库,基于redis
,类似sidekiq
和celery
,他具有以下特点:
保证至少执行一次任务 持久化 失败重试 worker崩溃自动恢复 优先队列 暂停队列 支持中间件 允许唯一任务 支持Redis Cluster实现自动分片 支持Redis Sentinels实现高可用 提供web ui管理 提供cli管理
go get -u github.com/hibiken/asynq
// 命令行工具:
go get -u github.com/hibiken/asynq/tools/asynq
使用
前提需要保证redis可用
main.go
Asynq服务端worker.go
处理程序asynq_test.go
模拟客户端使用
Asynq Server
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
func main() {
// asynq server
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: ":6379",
Password: "Your password",
DB: 0,
},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
// some middlewares
mux.Use(func(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
// just record a log
fmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))
return next.ProcessTask(ctx, t)
})
})
// some workers
mux.HandleFunc("msg", HandleMsg)
// start server
if err := srv.Start(mux); err != nil {
log.Fatalf("could not start server: %v", err)
}
// Wait for termination signal.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Quiet() // Stop processing new tasks
continue
}
break
}
// Stop worker server.
srv.Stop()
}
Asynq Workers
package main
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task) (err error) {
fmt.Println("------HandleMsg start------")
message, _ := t.Payload.GetString("message")
userid, _ := t.Payload.GetInt("user_id")
fmt.Println(fmt.Printf("{message: \"%s\", user_id: %d}", message, userid))
return
}
模拟使用
package main
import (
"fmt"
"os"
"testing"
"time"
"github.com/hibiken/asynq"
)
var c *asynq.Client
func TestMain(m *testing.M) {
r := asynq.RedisClientOpt{
Addr: ":6379",
Password: "Your password",
DB: 0,
}
c = asynq.NewClient(r)
ret := m.Run()
c.Close()
os.Exit(ret)
}
// 即时消费
func Test_Enqueue(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
task := asynq.NewTask("msg", payload)
res, err := c.Enqueue(task)
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: %+v\n", res)
}
// 延时消费
func Test_EnqueueDelay(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
task := asynq.NewTask("msg", payload)
res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
// res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: %+v\n", res)
}
// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
task := asynq.NewTask("msg", payload)
// 10秒超时,最多重试3次,20秒后过期
res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
if err != nil {
t.Errorf("could not enqueue task: %v", err)
t.FailNow()
}
fmt.Printf("Enqueued Result: %+v\n", res)
}
如何测试
先将服务运行起来
$ go version
go version go1.16 darwin/amd64
$ go run .
运行指定测试
$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1
=== RUN Test_Enqueue
Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}
--- PASS: Test_Enqueue (0.00s)
PASS
ok asynq_test 0.009s
队列管理
Asynq提供了webui 和 命令行工具asynq
webui
Asynqmon webui在这个仓库里
$ ./asynqmon --port=3000 --redis-addr=localhost:6380
asynq命令行
$ asynq -p Yourpassword stats
Task Count by State
active pending scheduled retry archived
---------- -------- --------- ----- ----
0 0 0 0 0
Task Count by Queue
default
-------
0
Daily Stats 2021-06-11 UTC
processed failed error rate
--------- ------ ----------
4 0 0.00%
Redis Info
version uptime connections memory usage peak memory usage
------- ------ ----------- ------------ -----------------
6.2.0 0 days 5 16.04MB 16.14MB
完整代码官方文档:https://github.com/hibiken/asynq
更多请查看:https://github.com/hibiken/asynq
欢迎加入我们GOLANG中国社区:https://gocn.vip/
《酷Go推荐》招募:
各位Gopher同学,最近我们社区打算推出一个类似GoCN每日新闻的新栏目《酷Go推荐》,主要是每周推荐一个库或者好的项目,然后写一点这个库使用方法或者优点之类的,这样可以真正的帮助到大家能够学习到
新的库,并且知道怎么用。
大概规则和每日新闻类似,如果报名人多的话每个人一个月轮到一次,欢迎大家报名!(报名地址:https://wj.qq.com/s2/7734329/3f51)
扫码也可以加入 GoCN 的大家族哟~
Gopher China2021大会日程详情来了!
点击下方「阅读原文」即可报名参加大会
评论