Airflow进阶
基本命令
用户界面也有以下相关操作按钮
* 查看
1、列出现有所有的活动的DAGSairflow list_dags2、列出 tutorial 的任务idairflow list_tasks tutorial3、以树形图的形式列出 tutorial 的任务idairflow list_tasks tutorial --tree
- 测试
1、模拟2015-06-01 执行tutorial的print_date任务airflow test tutorial print_date 2015-06-01
- 回填数据
如果希望新写的DAG执行过去一段时间的任务怎么办?
backfill 可以执行一个时间段内应该执行的所有任务
airflow backfill tutorial -s 2018-06-01 -e 2015-08-01
- 重建元数据库
airflow resetdb [-h] [-y]-y --yes,不经过提示确认就重置,默认为False
更多命令 官方Command Line Interface
BaseOperator
官网例子,里面的各个属性有代表什么意思?
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedeltadefault_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2015, 6, 1),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),
}dag = DAG('tutorial', default_args=default_args)# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id='print_date',bash_command='date',dag=dag)t2 = BashOperator(task_id='sleep',bash_command='sleep 5',retries=3,dag=dag)templated_command = """{% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}"{% endfor %}
"""t3 = BashOperator(task_id='templated',bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag)t2.set_upstream(t1)
t3.set_upstream(t1)
- retries (int) 重试几次才标记为失败
- retry_delay (timedelta) 两次重试间隔多长时间
- retry_exponential_backoff(bool)在重试延迟上运用算法增加等待时间
- max_retry_delay (timedelta) 重试之间的最大时间间隔
- start_time (datatime)确定第一个任务实例的execution_date,最佳做法是将start_date四舍五入到DAG的schedule_interval。
- end_time (datetime)如果指定,调度程序将不会超出此日期。
- depends_on_past (bool)设置为true时,任务实例将依次运行,同时依赖上一个任务的计划成功。允许start_date的任务实例运行。
- wait_for_downstream (bool)TODO
- dag (DAG) 任务所附的dag的引用(如果有的话)
parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。
dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数
max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量
non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的
- 更多配置
airflow执行组件
- Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。
- Executor:现在的Executor有三种:
- SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段
- LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task
- CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。
- 在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor
- Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。
配置文件
- 配置元素优先级
环境变量
airflow.cfg中的配置
airflow.cfg中的命令
默认
数据库
- 官方推荐 MySQL or Postgres 两种数据库
- 本地配置好数据库后
在airflow.cfg 中配置“executor”为“LocalExecutor”,可以在本地并行化任务实例的执行程序。
感谢
http://wingerted.com
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
