Airflow 基础 | Airflow 基础系列-04:调度周期介绍
共 4789字,需浏览 10分钟
·
2021-12-27 17:06
本文翻译自Chengzhi Zhao所写的Airflow Schedule Interval 101
原文和图片链接 https://towardsdatascience.com/airflow-schedule-interval-101-bbdda31cc463
Airflow的运行周期(schedule interval)是一个较难理解的概念,甚至说,对于已经使用了Airflow一段时间的开发人员来说,都是具有挑战性的。在StackOverflow上时常会有一个问题被问起:“为什么我的DAG没有像预期的那样运行呢?”这个问题通常显示出对Airflow运行周期的错误理解。在这篇文章里,我们将举例讨论如何设置Airflow的运行周期,设置好Airflow运行周期后,你将会得到什么样的预期结果,以及如何debug你的Airflow运行周期问题。
Airflow是如何调度DAGs的?
事先需要说的一点是,Airflow不是用来解决流式问题的。人们通常使用它作为ETL工具。Airflow自身的调度器采用了和cron一样的调度周期语法。对它来说,最细的时间区间是分钟。在整个调度系统中,一直在持续运行的,只有调度器本身。
但是,作为一个非实时的解决方案,Airflow不会一直触发并且监控你的DAGs。它会周期性地监控。监控周期可以通过scheduler_heartbeat_sec来设置。在生产环境中,建议将这个值设定到60秒以上,从而避免一些不可预料的结果。为了预防程序崩溃的发生,Airflow需要一个后端的数据库来跟踪所有的进程。如果设置更小的心跳监测时间意味着Airflow的调度器需要更频繁地检查它是否需要触发新的任务,这样的话,Airflow的调度器和它后端的数据库就要承受更大的压力。
总结来说,Airflow调度器会遵循心跳周期,迭代所有的DAGs,计算好他们下次的调度时间,并且和钟表时间对比,决定当下是否要触发某个DAG。
为什么需要一个start_date?
每个DAG都有它的调度时间,简单来来说start_date是某个DAG该被Airflow调度器开始调度的时间,它也帮助开发者在生产日期前发布这个DAG。在Airflow 1.8 版本前,你可以更加动态化地设定这个参数。但是,推荐来说,还是设定一个固定的时间更好,详情请见链接:“Less forgiving scheduler on dynamic start_date”.
我们该用哪个时区呢?
Airflow底层架构初期只使用UTC。即便现在你可以把Airflow时区设置成你的本地时间,绝大多数时候Airflow还是部署在UTC时区下。将Airflow设置成UTC时区在跨时区作业或者遇到冬夏令时交替的偶发事件时更加方便。
如何设定Airflow调度周期?
你可能已经熟悉Airflow DAG的语法了,通常只要在DAG文件中的args 里同时设定好start_date和scheduler_interval就可以了。
from airflow import DAG
from datetime import datetime, timedeltadefault_args = {
'owner': 'XYZ',
'start_date': datetime(2020, 4, 1),
'schedule_interval': '@daily',
}dag = DAG('tutorial', catchup=False, default_args=default_args)
该如何设定schedule_interval呢?
这个链接Airflow Scheduler对于可以设定的值提供了更详细的讲解。知道如何使用cron表达式来设定调度周期是有必要的。如果你觉得自己搞不清怎么写cron表达式,这个链接会翻译你填写的cron表达式crontab guru。Airflow也为用户提供了更加友好的语法,例如@daily和@weekly。但我觉得它们和cron表达式比起来不够清晰。而且只是特定的一些周期有这样的用户友好语法。它们的底层依然是通过crontab来实现的。所以你最好还是学习一下cron的写法并且坚持使用它。补充一点:如果你只是想触发一下你的DAG,只需要手动设定:schedule_interval:None。
execution_date和start_date有什么区别呢?
对于调度系统而言,日期和时间是非常基础的元素。在Airflow中,有两种日期你需要格外地花时间去理解:execution_date和start_date。值得注意的是,这里说的start_date不是先前讲的那个你在DAG里设定的参数。
某个DAG的execution_date是你期望的它被触发的时间。
某个DAG的start_time是它实际被触发时的钟表时间。
一个常常被问到的问题是:为什么execution_date和start_date不一致呢?我们举例说明一下,例子中的DAG在0 2 * * *这样的周期运行,详见如下代码:
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
args = {
'owner': 'Airflow',
'start_date': datetime(2020, 4, 1),
'depends_on_past': True,
}
dag = DAG(
dag_id='scheduler_interval_101',
schedule_interval='0 2 * * *',
default_args=args,
tags=['example']
)
hello_my_task = BashOperator(
task_id='hello_task',
bash_command='echo "hello_world"',
dag=dag,)
0 2 * * *意味着Airflow在每天的2:00 a.m.都会开始一个新的任务。我们让一个DAG以这样的运行周期跑几天。如果你点击Browse → Tasks Instances,你会同时看到execution_date和start_date。
我在04–10 00:05:21 (UTC)开始运行这个DAG,对于任何新的Airflow DAG来说,发生的第一件事通常是backfill(默认开启)。就如你可以在下图看到的一样,execution_date如预期般按天增加,并且时间和预期的一样。同时start_date正是Airflow调度器开始运行任务的时间。
Example of Task Instance (Image by Author)
在所有先前调度都backfill完成后,你大概会注意到,04-09这天的记录没有出现,但是就钟表时间来说,已经是04-10了,哪里出错了呢?
答案是:哪里都没有错。
首先,Airflow以ETL作业为设计出发点,对于这样类型的任务来说,通常一个批处理是跑24小时。设想一下,对于一个24小时窗口的ETL任务,你肯定会在这个窗口结束的时候才会去触发任务。这里是同样的情况,所以说,我们没有看到execution_date是04-09的记录是因为这个24h时间窗口还没有结束。通过execution_date我们可以看到,最后成功运行时间是04-08T02:00:00(注意,这里的execution_date是24h窗口的起始时间点),并且在04-09T02:00:00(不包括)结束。所以我们04-09的任务运行时间窗口是什么样的呢?是从04-09T02:00:00到04-10T02:00:00,而这个时间点还没有到。
Execution Data and Start Date Relationship (Image by Author)
Airflow调度器什么时候跑这个04-09的调度呢?答案是04-10 02:00:00(钟表时间)。一旦04-09任务调度被触发了,你将会看到04-09T02:00:00这样的execution_date和类似04-10T02:01:15(根据Airflow决定何时触发这个任务而略微变化,这个下面会细说)这样的记录。综上所述,你可以很容易理解execution_date和start_date是不一样的。知道这样的区别对于基于execution_date使用类似{{ds}}这样的时间模板是很有好处的。
另外一个角度可以这样去想,execution_date接近于上一次调度的start_date。我们用一个更加复杂的例子:0 2 * * 4,5,6这个cron表达式代表了在每个周四,周五,周六的02:00运行。
下图的日历(黑色部分)代表着钟表时间,或者说,start_date,红色代表着execution_date,即预期的调度时间。如果你有这样的调度周期,你不应该因为Airflow在04-09触发04-04的DAG而感到惊讶。
Example of Schedule Interval (Image by Author)
为什么在触发DAGs的时候会有短暂的延迟呢?
通过上述示例,尽管我们发现excution_date和start_date在日期上的差异,但为什么具体的时刻上也有细微的不同呢?举例说,每日为周期运行的execution_date为04-09T02:00:00对应的start_date是04-10T02:01:15。为什么会有1.25分钟的延迟呢?打个比方,假设我们日常线上开会定在每周一10:00:00 a.m.(scheduler_interval),在这周一10:00:00 a.m.(execution_date),你收到了来自日历app的加入会议的提醒,然后你点击了会议的链接并且开始你的线上会议。当你进入到虚拟会议室时,会议开始,时间为10:01:15 a.m.(start_date)。
你大概已经注意到了这个在execution_date和start_date之间在时刻上微小的延迟。理论上讲,这两个时间应该是完全一样的,但事实不是这样。为什么?因为如同我们一开始所说,Airflow的调度器不会一直监视DAGs。调度器等到心跳信号的时候才会检查并执行调度任务,这个过程会造成延迟。并且,就算调度器在完全精准的时间触发了任务,你也要考虑代码执行和数据库更新的延迟。所有上述的因素共同造成了调度时的细微延迟。
最后的思考
我希望这篇文章可以讲清楚Airflow调度周期是怎么一回事。从Airflow内部来讲,它是一个复杂的系统。但是对于用户来说,它使用起来却非常地直接明了。以ETL为出发点,用户需要一些时间去理解Airflow调度器是如何处理调度周期。一旦你对Airflow的调度周期有了更好的理解,编写一个在预定周期内运行的DAG将畅通无阻。