基于 CRON 库扩展的分布式 Crontab 的实现

共 22638字,需浏览 46分钟

 ·

2022-07-17 17:31

作者:熊喵君,原文链接:https://pandaychen.github.io/2022/01/16/A-GOLANG-CRONTAB-V3-ANALYSIS/

0x00 前言

cron[1] 是一个用于管理定时任务的库(单机),基于 Golang 实现 Linux 中 crontab 的功能

0x01 使用

Linux 的 crontab

crontab 基本格式:

# 文件格式說明
# ┌──分钟(0 - 59)
# │ ┌──小时(0 - 23)
# │ │ ┌──日(1 - 31)
# │ │ │ ┌─月(1 - 12)
# │ │ │ │ ┌─星期(0 - 6,表示从周日到周六)
# │ │ │ │ │
# * * * * * 被执行的命令

基础例子

用法极丰富,V3 版本也支持标准的 crontab 格式,具体用法细节可以参考 此文[2]

func main() {
    job := cron.New(
        cron.WithSeconds(), // 添加秒级别支持,默认支持最小粒度为分钟(如需秒级精度则必须设置)
    )
    // 每秒钟执行一次
    job.AddFunc("* * * * * *"func() {
        fmt.Printf("task run: %v\n", time.Now())
    })
    job.Run()   // 启动
}

其他典型的用法还有如下:

type cronJobDemo int

func (c cronJobDemo) Run() {
        fmt.Println("5s func trigger")
        return
}

func main() {
    c := cron.New(
            cron.WithSeconds(),
    )
    c.AddFunc("0 * * * *"func() { fmt.Println("Every hour on the half hour") })
    c.AddFunc("30 3-6,20-23 * * *"func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
    c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *"func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
    c.AddFunc("@every 5m"func() { fmt.Println("every 5m, start 5m fron now") }) // 容易理解的格式
    // 通过 AddJob 注册
    var cJob cronJobDemo
    c.AddJob("@every 5s", cJob)
    c.Start()
    // c.Stop()

    select {}
}

0x02 代码分析

核心数据结构

对于 cron 库的整体逻辑,最关键的两个数据结构就是 EntryCron

1、Job:抽象一个定时任务,cron 调度一个 Job,就去执行 JobRun() 方法

type Job interface {
    Run()
}

FuncJobFuncJob 实际就是一个 func() 类型,实现了 Run() 方法:

type FuncJob func()
func (f FuncJob) Run() { 
    f() 
}

在实际应用中,我们需要对 Job 结构做一些扩展,于是就有了 JobWrapper,使用修饰器机制加工 Job(传入一个 Job,返回一个 Job),有点像 gin 中间件,包装器可以在执行实际的 Job 前后添加一些逻辑,然后使用一个 Chain 将这些 JobWrapper 组合到一起。

比如给 Job 添加这样一些属性:

  • Job 回调方法中捕获 panic 异常
  • 如果 Job 上次运行还未结束,推迟本次执行
  • 如果 Job 上次运行还未结束,跳过本次执行
  • 记录每个 Job 的执行情况
type JobWrapper func(Job) Job

type Chain struct {
  wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
  return Chain{c}
}

2、Chain 结构ChainJobWrapper 的数组,调用 Chain 对象的 Then(j Job) 方法应用这些 JobWrapper,返回最终的 Job

type Chain struct {
  wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
  return Chain{c}
}

func (c Chain) Then(j Job) Job {
  for i := range c.wrappers {
      // 注意:应用 JobWrapper 的顺序
    j = c.wrappers[len(c.wrappers)-i-1](j "len(c.wrappers "len(c.wrappers)-i-1")-i-1")
  }
  return j
}

3、Schedule:描述一个 job 如何循环执行的抽象,需要实现Next方法,此方法返回任务下次被调度的时间

// Schedule describes a job's duty cycle.
type Schedule interface {
 // Next returns the next activation time, later than the given time.
 // Next is invoked initially, and then each time the job is run.
 Next(time.Time) time.Time
}

Scheduler 的实例化结构有:

  • ConstantDelaySchedule实现[3]
  • SpecSchedule实现[4],默认选择,提供了对 Cron 表达式的解析能力

4、Entry 结构:抽象了一个 job 每当使用 AddJob 注册一个定时调用策略,就会为该策略生成唯一的 EntryEntry 里会存储被执行的时间、需要被调度执行的实体 Job

type Entry struct {
    ID EntryID          // job id,可以通过该 id 来删除 job
    Schedule Schedule   // 用于计算 job 下次的执行时间
    Next time.Time      // job 下次执行时间
    Prev time.Time      // job 上次执行时间,没执行过为 0
    WrappedJob Job      // 修饰器加工过的 job
    Job Job             // 未经修饰的 job,可以理解为 AddFunc 的第二个参数
}

5、Cron结构[5]:关于 Cron 结构,有一些细节,entries 为何设计为一个指针 slice

// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
    entries   []*Entry          // 所有 Job 集合
    chain     Chain             // 装饰器链
    stop      chan struct{}     // 停止信号
    add       chan *Entry       // 用于异步增加 Entry
    remove    chan EntryID      // 用于异步删除 Entry
    snapshot  chan chan []Entry
    running   bool              // 是否正在运行
    logger    Logger
    runningMu sync.Mutex        // 运行时锁
    location  *time.Location    // 时区相关
    parser    Parser            // Cron 解析器
    nextID    EntryID
    jobWaiter sync.WaitGroup    // 并发控制,正在运行的 Job
}

entries 成员

刚才说到 entries 为何设计为指针 slice,原因在于 cron 核心逻辑中,每次循环开始时都会对 Cron.entries 进行排序,排序字段依赖于每个 Entry 结构的 Next 成员,排序依赖于下面的原则:

  1. 按照触发时间正向排序,越先触发的越靠前
  2. IsZero 的任务向后面排
  3. 由于可能存在相同周期的任务 Job,所以排序是不稳定的
// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry

func (s byTime) Len() int      { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
 // Two zero times should return false.
 // Otherwise, zero is "greater" than any other time.
 // (To sort it at the end of the list.)
 if s[i].Next.IsZero() {
  return false
 }
 if s[j].Next.IsZero() {
  return true
 }
    // 排序的原则,s[i] 比 s[j] 先触发
 return s[i].Next.Before(s[j].Next)
}

0x03 内置 JobWrapper 介绍

Recover:捕捉 panic,避免进程异常退出

此 wrapper 比较好理解,在执行内层的 Job 逻辑前,添加 recover() 调用。如果 Job.Run() 执行过程中有 panic。这里的 recover() 会捕获到,输出调用堆栈

// cron.go
func Recover(logger Logger) JobWrapper {
  return func(j Job) Job {
    return FuncJob(func() {
      defer func() {
        if r := recover(); r != nil {
          const size = 64 << 10
          buf := make([]byte, size)
          buf = buf[:runtime.Stack(buf, false)]
          err, ok := r.(error)
          if !ok {
            err = fmt.Errorf("%v", r)
          }
          logger.Error(err, "panic""stack""...\n"+string(buf))
        }
      }()
      j.Run()
    })
  }
}

DelayIfStillRunning

实现了已有任务运行推迟的逻辑。核心是通过一个(任务共用的)互斥锁 sync.Mutex,每次执行任务前获取锁,执行结束之后释放锁。所以在上一个任务结束前,下一个任务获取锁会阻塞,从而保证的任务的串行执行。

// chain.go
func DelayIfStillRunning(logger Logger) JobWrapper {
  return func(j Job) Job {
    var mu sync.Mutex
    return FuncJob(func() {
      start := time.Now()
      // 下一个任务阻塞等待获取锁
      mu.Lock()
      defer mu.Unlock()
      if dur := time.Since(start); dur > time.Minute {
        logger.Info("delay""duration", dur)
      }
      j.Run()
    })
  }
}

SkipIfStillRunning

DelayIfStillRunning 机制不一样,该方法是跳过执行,通过无缓冲 channel 机制实现。执行任务时,从通道中取值,如果成功,执行,否则跳过。执行完成之后再向通道中发送一个值,确保下一个任务能执行。初始发送一个值到通道中,保证第一个任务的执行。

func SkipIfStillRunning(logger Logger) JobWrapper {
  return func(j Job) Job {
    // 定义一个无缓冲 channel
    var ch = make(chan struct{}, 1)
    ch <- struct{}{}
    return FuncJob(func() {
      select {
      case v := <-ch:
        j.Run()
        ch <- v
      default:
        logger.Info("skip")
      }
    })
  }
}

0x04 核心方法分析

AddJob 方法

AddJob 方法通过两种方法将任务节点 entry 添加到 Cron.entries 中:

  1. 初始化时,直接 append
  2. 运行状态下,通过 channel 方式异步添加,避免加锁
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
 schedule, err := c.parser.Parse(spec)
 if err != nil {
  return 0, err
 }
 return c.Schedule(schedule, cmd), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
 c.runningMu.Lock()
 defer c.runningMu.Unlock()
 c.nextID++
 entry := &Entry{
  ID:         c.nextID,
  Schedule:   schedule,
  WrappedJob: c.chain.Then(cmd),
  Job:        cmd,
 }
 if !c.running {
        // 直接加
  c.entries = append(c.entries, entry)
 } else {
        // 异步
  c.add <- entry
 }
 return entry.ID
}

run 方法

run 方法

cron 的核心 run() 方法的实现如下,这个是很经典的 for-select 异步处理模型,避免的对 entries 加锁,非常值得借鉴。其核心有如下几点:

  1. 一个定时任务(集)的实现,内部采用排序数组,取数组首位元素的时间作为timer触发时间(感觉可以优化为最小堆?)

    • 每个 entry 都包含了该 entry 下一次执行的绝对时间,本轮执行完成后立即计算下一轮时间,等待下次循环时排序更新
    • 每次循环开始对 cron.entries 按下次执行时间升序排序,只需要对第一个 entry 启动定时器即可
    • 定时器事件触发时,轮询 cron.entries 里需要执行的 entries 直到第一个不满足条件的,由于数组是升序,后面无需再遍历
    • 同时,第一个定时器处理结束开启下次定时器时,也只需要更新执行过的 cron.entriesNext(下次执行时间),不需要更新所有的 cron.entries
  2. Cron内部数据结构的维护,采用channel实现无锁机制,缺点是可能会有误差(ms级),不过在此项目是能够容忍的,以 Job

    异步添加为例(运行中添加entry,走异步方式,有duration的延迟):

    • 某个 Job 之间的 delta 差,可能多出了 duration 的延迟,可以容忍
    • 定时器实现里,会扫描所有当前时间之前的 cron.entries 来执行,增加了容错
func (c *Cron) run() {
    c.logger.Info("start")

    // 初始化,计算每个 Job 下次的执行时间
    now := c.now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule""now", now, "entry", entry.ID, "next", entry.Next)
    }

    // 在 dead loop,进行任务调度
    for {
        // 根据下一次的执行时间,对所有 Job 排序
        sort.Sort(byTime(c.entries))

        // 计时器,用于没有任务可调度时的阻塞操作
        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // 无任务可调度,设置计时器到一个很大的值,把下面的 for 阻塞住
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            // 有任务可调度了,计时器根据第一个可调度任务的下次执行时间设置
            // 排过序,所以第一个肯定是最先被执行的
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }

        for {
            select {
            // 有 Job 到了执行时间
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake""now", now)
                // 检查所有 Job,执行到时的任务
                for _, e := range c.entries {
                    // 可能存在相同时间出发的任务
                    if e.Next.After(now) || e.Next.IsZero() {
                        // 后面都不需要遍历了!
                        break
                    }
                    // 执行 Job 的 func()
                    c.startJob(e.WrappedJob)

                    // 保存上次执行时间
                    e.Prev = e.Next
                    // 设置 Job 下次的执行时间
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                }

            // 添加新 Job
            case newEntry := <-c.add:
                timer.Stop()        // 必须注意,这里停止定时器,避免内存泄漏!
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

            // 获取所有 Job 的快照
            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            // 停止调度
            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            // 根据 entryId 删除一个 Job
            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed""entry", id)
            }

            break
        }
    }
}

上述的代码的核心流程如下图:

image

0x05 小结

本文分析了基于 Golang 实现的单机定时任务库。

0x06 参考

  • golang cron v3 定时任务[6]
  • v3-repo[7]
  • Go 每日一库之 cron[8]
  • GO 编程模式:修饰器[9]

参考资料

[1]

cron: https://github.com/robfig/cron/

[2]

此文: https://segmentfault.com/a/1190000023029219

[3]

实现: https://github.com/robfig/cron/blob/v3/constantdelay.go

[4]

实现: https://pandaychen.github.io/2021/10/05/A-GOLANG-CRONTAB-V3-BASIC-INTRO/

[5]

结构: https://github.com/robfig/cron/blob/v3/cron.go#L13

[6]

golang cron v3 定时任务: https://blog.cugxuan.cn/2020/06/04/Go/golang-cron-v3/

[7]

v3-repo: https://github.com/robfig/cron/tree/v3

[8]

Go 每日一库之 cron: https://segmentfault.com/a/1190000023029219

[9]

GO 编程模式:修饰器: https://coolshell.cn/articles/17929.html



推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

浏览 5
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报