【GoCN酷Go推荐】异步任务队列Asynq

GoCN

共 7687字,需浏览 16分钟

 ·

2021-06-18 00:10

什么是Asynq

Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiqcelery,他具有以下特点:

  • 保证至少执行一次任务
  • 持久化
  • 失败重试
  • worker崩溃自动恢复
  • 优先队列
  • 暂停队列
  • 支持中间件
  • 允许唯一任务
  • 支持Redis Cluster实现自动分片
  • 支持Redis Sentinels实现高可用
  • 提供web ui管理
  • 提供cli管理

安装
go get -u github.com/hibiken/asynq
// 命令行工具:
go get -u github.com/hibiken/asynq/tools/asynq

使用

前提需要保证redis可用

main.go Asynq服务端worker.go 处理程序asynq_test.go 模拟客户端使用

Asynq Server

package main

import (
 "context"
 "fmt"
 "log"
 "os"
 "os/signal"
 "time"

 "github.com/hibiken/asynq"
 "golang.org/x/sys/unix"
)

func main() {
 // asynq server
 srv := asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     ":6379",
   Password: "Your password",
   DB:       0,
  },
  asynq.Config{Concurrency: 20},
 )

 mux := asynq.NewServeMux()

 // some middlewares
 mux.Use(func(next asynq.Handler) asynq.Handler {
  return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
   // just record a log
   fmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))

   return next.ProcessTask(ctx, t)
  })
 })

 // some workers
 mux.HandleFunc("msg", HandleMsg)

 // start server
 if err := srv.Start(mux); err != nil {
  log.Fatalf("could not start server: %v", err)
 }

 // Wait for termination signal.
 sigs := make(chan os.Signal, 1)
 signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
 for {
  s := <-sigs
  if s == unix.SIGTSTP {
   srv.Quiet() // Stop processing new tasks
   continue
  }
  break
 }

 // Stop worker server.
 srv.Stop()
}

Asynq Workers

package main

import (
 "context"
 "fmt"

 "github.com/hibiken/asynq"
)

// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task) (err error) {
 fmt.Println("------HandleMsg start------")

 message, _ := t.Payload.GetString("message")
 userid, _ := t.Payload.GetInt("user_id")

 fmt.Println(fmt.Printf("{message: \"%s\", user_id: %d}", message, userid))
 return
}

模拟使用

package main

import (
 "fmt"
 "os"
 "testing"
 "time"

 "github.com/hibiken/asynq"
)

var c *asynq.Client

func TestMain(m *testing.M) {
 r := asynq.RedisClientOpt{
  Addr:     ":6379",
  Password: "Your password",
  DB:       0,
 }
 c = asynq.NewClient(r)
 ret := m.Run()
 c.Close()
 os.Exit(ret)
}

// 即时消费
func Test_Enqueue(t *testing.T) {
 payload := map[string]interface{}{"user_id"1"message""i'm immediately message"}
 task := asynq.NewTask("msg", payload)
 res, err := c.Enqueue(task)
 if err != nil {
  t.Errorf("could not enqueue task: %v", err)
  t.FailNow()
 }
 fmt.Printf("Enqueued Result: %+v\n", res)
}

// 延时消费
func Test_EnqueueDelay(t *testing.T) {
 payload := map[string]interface{}{"user_id"1"message""i'm delay 5 seconds message"}
 task := asynq.NewTask("msg", payload)
 res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
 // res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
 if err != nil {
  t.Errorf("could not enqueue task: %v", err)
  t.FailNow()
 }
 fmt.Printf("Enqueued Result: %+v\n", res)
}

// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
 payload := map[string]interface{}{"user_id"1"message""i'm delay 5 seconds message"}
 task := asynq.NewTask("msg", payload)
 // 10秒超时,最多重试3次,20秒后过期
 res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
 if err != nil {
  t.Errorf("could not enqueue task: %v", err)
  t.FailNow()
 }
 fmt.Printf("Enqueued Result: %+v\n", res)
}

如何测试

先将服务运行起来

$ go version
go version go1.16 darwin/amd64
$ go run .

运行指定测试

$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1

=== RUN   Test_Enqueue
Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}
--- PASS: Test_Enqueue (0.00s)
PASS
ok   asynq_test 0.009s

队列管理

Asynq提供了webui 和 命令行工具asynq

webui

Asynqmon webui在这个仓库里

$ ./asynqmon --port=3000 --redis-addr=localhost:6380

img

asynq命令行

$ asynq -p Yourpassword stats
Task Count by State
active      pending   scheduled  retry  archived
----------  --------  ---------  -----  ----
0           0         0          0      0

Task Count by Queue
default
-------
0

Daily Stats 2021-06-11 UTC
processed  failed  error rate
---------  ------  ----------
4          0       0.00%

Redis Info
version  uptime  connections  memory usage  peak memory usage
-------  ------  -----------  ------------  -----------------
6.2.0    0 days  5            16.04MB       16.14MB

更多阅读

完整代码官方文档:https://github.com/hibiken/asynq


还想了解更多吗?

更多请查看:https://github.com/hibiken/asynq 

欢迎加入我们GOLANG中国社区:https://gocn.vip/


《酷Go推荐》招募:


各位Gopher同学,最近我们社区打算推出一个类似GoCN每日新闻的新栏目《酷Go推荐》,主要是每周推荐一个库或者好的项目,然后写一点这个库使用方法或者优点之类的,这样可以真正的帮助到大家能够学习到

新的库,并且知道怎么用。


大概规则和每日新闻类似,如果报名人多的话每个人一个月轮到一次,欢迎大家报名!(报名地址:https://wj.qq.com/s2/7734329/3f51)


扫码也可以加入 GoCN 的大家族哟~


 Gopher China2021大会日程详情来了!



点击下方「阅读原文」即可报名参加大会


浏览 181
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报