Airflow进阶

基本命令


用户界面也有以下相关操作按钮
* 查看

1、列出现有所有的活动的DAGSairflow list_dags2、列出 tutorial 的任务idairflow list_tasks tutorial3、以树形图的形式列出 tutorial 的任务idairflow list_tasks tutorial --tree
  • 测试
1、模拟2015-06-01 执行tutorial的print_date任务airflow test tutorial print_date 2015-06-01
  • 回填数据
    如果希望新写的DAG执行过去一段时间的任务怎么办?
    backfill 可以执行一个时间段内应该执行的所有任务
airflow backfill tutorial -s 2018-06-01 -e 2015-08-01
  • 重建元数据库
airflow resetdb [-h] [-y]-y --yes,不经过提示确认就重置,默认为False
更多命令 官方Command Line Interface

BaseOperator

官网例子,里面的各个属性有代表什么意思?

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedeltadefault_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2015, 6, 1),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),
}dag = DAG('tutorial', default_args=default_args)# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(task_id='print_date',bash_command='date',dag=dag)t2 = BashOperator(task_id='sleep',bash_command='sleep 5',retries=3,dag=dag)templated_command = """{% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"echo "{{ params.my_param }}"{% endfor %}
"""t3 = BashOperator(task_id='templated',bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag)t2.set_upstream(t1)
t3.set_upstream(t1)
  1. retries (int) 重试几次才标记为失败
  2. retry_delay (timedelta) 两次重试间隔多长时间
  3. retry_exponential_backoff(bool)在重试延迟上运用算法增加等待时间
  4. max_retry_delay (timedelta) 重试之间的最大时间间隔
  5. start_time (datatime)确定第一个任务实例的execution_date,最佳做法是将start_date四舍五入到DAG的schedule_interval。
  6. end_time (datetime)如果指定,调度程序将不会超出此日期。
  7. depends_on_past (bool)设置为true时,任务实例将依次运行,同时依赖上一个任务的计划成功。允许start_date的任务实例运行。
  8. wait_for_downstream (bool)TODO
  9. dag (DAG) 任务所附的dag的引用(如果有的话)
  10. parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。

  11. dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数

  12. max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量

  13. non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的

  14. 更多配置

airflow执行组件

  • Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。
  • Executor:现在的Executor有三种:
  • SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段
  • LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task
  • CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。
  • 在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor
  • Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。

配置文件


  • 配置元素优先级

环境变量
airflow.cfg中的配置
airflow.cfg中的命令
默认

数据库


  • 官方推荐 MySQL or Postgres 两种数据库
  • 本地配置好数据库后

在airflow.cfg 中配置“executor”为“LocalExecutor”,可以在本地并行化任务实例的执行程序。

感谢

http://wingerted.com


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部