Airflow 基础 | Airflow 基础系列-01:Airflow组件
共 3438字,需浏览 7分钟
·
2021-12-27 17:05
Apache Airflow 微信公众号是由一群Airflow的爱好者一起维护的,我们旨在普及Airflow知识,给广大Airflow用户搭建一个交流的平台。
前期我们会发布一些Airflow的基本知识文章。本文是Apache Airflow 公众号的开篇之作,主要介绍Airflow的整体架构以及主要的组件。
Airflow 架构
下图是Airflow的架构图,
我们可以看到Airflow主要有4个组件:
WebServer
Scheduler
Executor
Database
WebServer - Airflow UI
WebServer 实际上是一个Python的Flask app。你可以在Airflow WebServer的UI界面上看到调度作业的状态。作业的信息都存储在数据库里,WebServer负责查询数据库并在页面上展示作业信息。WebServer 组件也负责读取并展示Remote的作业日志(Airflow的作业日志可以存放在S3,Google Cloud Storage,AzureBlobs,ElasticsSearch等等)。
Scheduler - 作业调度器
Scheduler 是一个多线程的Python进程。Scheduler 通过检查DAG的task依赖关系以及数据库里各个task的状态来决定接下来跑哪个task,什么时候跑以及在哪里跑。
Executor - 作业执行器
Airflow 支持以下4种类型的 Executor
SequentialExecutor 按照线性的方式运行task,没有并发和并行。一般用在开发环境里用。
LocalExecutor 支持并行和多行程,一般用在单节点的Airflow里。
CeleryExecutor 是在分布式环境下执行器。但是依赖第三方的message queue组件来调度task到worker节点,message queue可以用Redis,RabbitMQ。
KubernetesExecutor 是 Airfow 1.10新引入的一个执行器,主要用在K8s环境里。
Metadata Database - 元数据数据库
元数据数据库可以是任何支持 SQLAlchemy的数据库(比如Postgres,MySql)。Scheduler 通过修改数据库来更新task状态,WebServer会读取数据库来展示作业状态
Airflow 是如何调度的
Schduler 会扫描dags文件夹,在元数据数据库里创建DAG对应的记录。根据配置,每一个DAG都会分配若干个进程。
每个进程都会扫描对应的DAG文件,根据调度配置参数创建DagRuns。然后每一个满足被调度条件的Task都会实例出来一个TaskInstance,TaskInstance会被初始化为Scheduled的状态,并在数据库里更新。
Scheduler 进程查询数据库拿到所有Scheduled状态的tasks,并把他们发送到Executor(对应TaskInstance的状态更新为QUEUED)
Worker会从queue里拉取task并执行。TaskInstance的状态由QUEUED转变为 RUNNING
当一个task结束了,worker会把task状态更新为对应的结束状态(FINISHED,FAILED,等等),Scheduler会在数据库里更新对应的状态。
# https://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386
def _process_dags(self, dagbag, dags, tis_out):
"""
Iterates over the dags and processes them. Processing includes:
1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs.
:param dagbag: a collection of DAGs to process
:type dagbag: models.DagBag
:param dags: the DAGs from the DagBag to process
:type dags: DAG
:param tis_out: A queue to add generated TaskInstance objects
:type tis_out: multiprocessing.Queue[TaskInstance]
:return: None
"""
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
if dag.is_paused:
self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
continue
if not dag:
self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
continue
self.log.info("Processing %s", dag.dag_id)
dag_run = self.create_dag_run(dag)
if dag_run:
self.log.info("Created %s", dag_run)
self._process_task_instances(dag, tis_out)
self.manage_slas(dag)
models.DagStat.update([d.dag_id for d in dags])
Airflow 组件配置
配置Airflow 组件之间的交互是同一个airflow.cfg 文件控制的,这个文件里本身有对各种配置的说明文档,这里我们介绍一些常用的配置项:
Parallelism
以下3个参数可以用来控制task的并行度。
parallelism
, dag_concurrency
and max_active_runs_per_dag
parallelism是指airflow executor最多运行的task数。dag_concurrenty 是指单个dag最多运行的task数。
max_active_runs_per_dag 是指最多的dag运行实例。
Scheduler
job_heartbeat_sec 是指task接受外部kill signal的频率(比如你在airflow web页面kill task),默认是5秒钟
scheduler_heartbeat_sec 是指scheduler 触发新task的时间隔间,默认是5秒。
还有更多配置会在后面的文章里介绍,也希望大家多多支持Airflow公众号,后续我们更新更多有关Airflow的文章。
本文翻译 https://www.astronomer.io/guides/airflow-components