糖尿病康复,内容丰富有趣,生活中的好帮手!
糖尿病康复 > 十八 可视化任务调度系统airflow

十八 可视化任务调度系统airflow

时间:2022-09-22 05:56:54

相关推荐

十八 可视化任务调度系统airflow

最近工作需要,使用airflow搭建了公司的ETL系统,顺带在公司分享了一次airflow,整理成文,Enjoy!

1. airflow 介绍

1.1 airflow 是什么

Airflow is a platform to programmatically author, schedule and monitor workflows.

airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。

1.2 airflow 核心概念

DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。Operators:可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash 命令,PythonOperator调用任意的Python 函数,EmailOperator用于发送邮件,HTTPOperator用于发送HTTP请求,SqlOperator用于执行SQL命令...同时,用户可以自定义Operator,这给用户提供了极大的便利性。Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。Task Instance:task的一次运行。task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如TaskA >> TaskB,表明TaskB依赖于TaskA。

通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 workflow了。

1.3 其它概念

Connections: 管理外部系统的连接信息,如外部MySQL、HTTP服务等,连接信息包括conn_idhostnameloginpasswordschema等,可以通过界面查看和管理,编排workflow时,使用conn_id进行使用。Pools: 用来控制tasks执行的并行数。将一个task赋给一个指定的pool,并且指明priority_weight,可以干涉tasks的执行顺序。XComs:在airflow中,operator一般(not always)是原子的,也就是说,他们一般独立执行,同时也不需要和其他operator共享信息,如果两个operators需要共享信息,如filename之类的, 推荐将这两个operators组合成一个operator。如果实在不能避免,则可以使用XComs (cross-communication)来实现。XComs用来在不同tasks之间交换信息。Trigger Rules:指task的触发条件。默认情况下是task的直接上游执行成功后开始执行,airflow允许更复杂的依赖设置,包括all_success(所有的父节点执行成功),all_failed(所有父节点处于failed或upstream_failed状态),all_done(所有父节点执行完成),one_failed(一旦有一个父节点执行失败就触发,不必等所有父节点执行完成),one_success(一旦有一个父节点执行成功就触发,不必等所有父节点执行完成),dummy(依赖关系只是用来查看的,可以任意触发)。另外,airflow提供了depends_on_past,设置为True时,只有上一次调度成功了,才可以触发。

2. 示例

先来看一个简单的DAG。图中每个节点表示一个task,所有tasks组成一个DAG,各个tasks之间的依赖关系可以根据节点之间的线看出来。

DAGs

2.1 实例化DAG

# -*- coding: UTF-8 -*-## 导入airflow需要的modulesfrom airflow import DAGfrom datetime import datetime, timedeltadefault_args = {'owner': 'lxwei','depends_on_past': False, # 如上文依赖关系所示'start_date': datetime(, 1, 17), # DAGs都有个参数start_date,表示调度器调度的起始时间'email': ['lxwei@'], # 用于alert'email_on_failure': True,'email_on_retry': False,'retries': 3, # 重试策略'retry_delay': timedelta(minutes=5)}dag = DAG('example-dag', default_args=default_args, schedule_interval='0 0 * * *')

在创建DAGs时,我们可以显示的给每个Task传递参数,但通过default_args,我们可以定义一个默认参数用于创建tasks。

注意,schedule_interval 跟官方文档不一致,官方文档的方式已经被deprecated

2.2 定义依赖关系

这个依赖关系是我自己定义的,key表示某个taskId,value里的每个元素也表示一个taskId,其中,key依赖value里的所有task。

"dependencies": {"goods_sale_2": ["goods_sale_1"], # goods_sale_2 依赖 goods_sale1"shop_sale_1_2": ["shop_sale_1_1"],"shop_sale_2_2": ["shop_sale_2_1"],"shop_sale_2_3": ["shop_sale_2_2"],"etl_task": ["shop_info", "shop_sale_2_3", "shop_sale_realtime_1", "goods_sale_2", "shop_sale_1_2"],"goods_sale_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_1_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_realtime_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_sale_2_1": ["timelySalesCheck", "productDaySalesCheck"],"shop_info": ["timelySalesCheck", "productDaySalesCheck"]}

2.3 定义tasks和依赖关系

首先,实例化operators,构造tasks。如代码所示,其中,EtlTaskMySQLToWebDataTransferMySQLSelector是自定义的三种Operator,根据taskType实例化operator,并存放到taskDict中,便于后期建立tasks之间的依赖关系。

for taskConf in tasksConfs:taskType = taskConf.get("taskType")if taskType == "etlTask":task = EtlTask(task_id=taskConf.get("taskId"),httpConnId=httpConn,etlId=taskConf.get("etlId"),dag=dag)taskDict[taskConf.get("taskId")] = taskelif taskType == "MySQLToWebDataTransfer":task = MySqlToWebdataTransfer(task_id = taskConf.get("taskId"),sql= taskConf.get("sql"),tableName=taskConf.get("tableName"),mysqlConnId =mysqlConn,httpConnId=httpConn,dag=dag)taskDict[taskConf.get("taskId")] = taskelif taskType == "MySQLSelect":task = StatusChecker(task_id = taskConf.get("taskId"),mysqlConnId = mysqlConn,sql = taskConf.get("sql"),dag = dag)taskDict[taskConf.get("taskId")] = taskelse:logging.error("error. TaskType is illegal.")

构建tasks之间的依赖关系,其中,dependencies中定义了上面的依赖关系,A >> B表示A是B的父节点,相应的,A << B表示A是B的子节点。

for sourceKey in dependencies:destTask = taskDict.get(sourceKey)sourceTaskKeys = dependencies.get(sourceKey)for key in sourceTaskKeys:sourceTask = taskDict.get(key)if (sourceTask != None and destTask != None):sourceTask >> destTask

3. 常用命令

命令行输入airflow -h,得到帮助文档

backfill Run subsections of a DAG for a specified date rangelist_tasksList the tasks within a DAGclearClear a set of task instance, as if they never ranpausePause a DAGunpause Resume a paused DAGtrigger_dag Trigger a DAG runpoolCRUD operations on poolsvariables CRUD operations on variableskerberos Start a kerberos ticket renewerrender Render a task instance's template(s)run Run a single task instanceinitdb Initialize the metadata databaselist_dags List all the DAGsdag_state Get the status of a dag runtask_failed_deps Returns the unmet dependencies for a task instancefrom the perspective of the scheduler. In other words,why a task instance doesn't get scheduled and thenqueued by the scheduler, and then run by an executor).task_stateGet the status of a task instanceserve_logsServe logs generate by workertestTest a task instance. This will run a task withoutchecking for dependencies or recording it's state inthe database.webserver Start a Airflow webserver instanceresetdb Burn down and rebuild the metadata databaseupgradedb Upgrade the metadata database to latest versionscheduler Start a scheduler instanceworker Start a Celery worker nodeflower Start a Celery Flowerversion Show the versionconnections List/Add/Delete connections

其中,使用较多的是backfillruntestwebserverscheduler。其他操作在web界面操作更方便。另外,initdb用于初始化metadata,使用一次即可;resetdb会重置metadata,清除掉数据(如connection数据), 需要慎用。

4. 问题

在使用airflow过程中,曾把DAGs里的task拆分得很细,这样的话,如果某个task失败,重跑的代价会比较低。但是,在实践中发现,tasks太多时,airflow在调度tasks会很低效,airflow一直处于选择待执行的task的过程中,会长时间没有具体task在执行,从而整体执行效率大幅降低。

5. 总结

airflow 很好很强大。如果只是简单的ETL之类的工作,可以很容易的编排。调度灵活,而且监控和报警系统完备,可以很方便的投入生产环节。

6. 参阅

airflow 官网

github

如果觉得《十八 可视化任务调度系统airflow》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。