Alo7 数仓分享. Airflow 作为组内的主要调度工具, 整理了状态变化方便源码阅读. (全文基于Airflow 1.10.3 所写)

简单地说

airflow-scheduler-state-change 如上图在正常的Scheduler流程中一共有 11 个状态组成(Airflow 中一共有 12 个状态, 还有一个SHUTDOWN 没有在本文中介绍).

※下图在查看 Airflow 源码时所画流程图, 下文各条都能在图中找到对应源码位置(图片有些大 右键查看)

image

☆SchedulerJob 会在 DafFileProcessorAgent 的帮助下:

☆SchedulerJob 本身

☆LocalTaskJob 检查 ti 是否满足 RUN_DEPS 依赖.

☆TaskInstance#task_runner#start 任务真实执行, 并按照程序结果设置状态.

稍微多说点

依赖管理(airflow.ti_deps)(源码部分有修改)

class airflow.ti_deps.deps.BaseTIDep

上下文依赖的抽象类. 定义了依赖检查流程.

# dep检查结果
TIDepStatus = namedtuple('TIDepStatus', ['dep_name', 'passed', 'reason'])

class BaseTIDep(object):
  IGNOREABLE = False # 定义该dep是否可以被忽略
  IS_TASK_DEP = False # 顶盖该dep是否是task dep

  def _failing_status(self, reason=''): # dep检查成功
        return TIDepStatus(self.name, False, reason)

  def _passing_status(self, reason=''): # dep检查失败
      return TIDepStatus(self.name, True, reason)
  
  def _get_dep_statuses(self, ti, session, dep_context=None):
        # 由子类实现具体的依赖检查
        raise NotImplementedErro

  def get_dep_statuses(self, ti, session, dep_context=None):
        # 默认行为处理, 如ignore_task deps. 并调用_get_dep_statuses来完成特定依赖(子类覆写)检查 
        from airflow.ti_deps.dep_context import DepContext

        if dep_context is None:
            dep_context = DepContext()

        if self.IGNOREABLE and dep_context.ignore_all_deps:
            yield self._passing_status(
                reason="Context specified all dependencies should be ignored.")
            return

        if self.IS_TASK_DEP and dep_context.ignore_task_deps:
            yield self._passing_status(
                reason="Context specified all task dependencies should be ignored.")
            return

        for dep_status in self._get_dep_statuses(ti, session, dep_context):
            yield dep_status

BaseTIDep 子类举例

当TriggerRuleDep除了返回dep是否满足意外. 当 flag_upstream_failed 为True是会生成task 的upstream_failed 和 skipped 状态. 规则如下:

### 如果任务没有直接上游 或者任务的触发条件为DUMMY, 则执行.

if not ti.task.upstream_list:
    yield self._passing_status(
        reason="The task instance did not have any upstream tasks.")
    return

if ti.task.trigger_rule == TR.DUMMY:
    yield self._passing_status(reason="The task had a dummy trigger rule set.")
    return

### 统计任务上游数据.

import airflow.example_dags.example_branch_operator as brh_op
brh_op.dag.get_task('join').upstream_task_ids
# {'follow_branch_a', 'follow_branch_b', 'follow_branch_d', 'follow_branch_c'}

###
# succuss: 上游任务state = success 的任务个数
# skipped: 上游任务state = skipped 的任务个数
# failed: 上游任务state = failed 的任务个数
# upstream_failed: 上游任务state = upstream_failed 的任务个数
# done: 上游任务的任务个数
# upstream_done: done >= len(upstreams)
###
qry = (
    session
    .query(
        func.coalesce(func.sum(
            case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
        func.coalesce(func.sum(
            case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
        func.coalesce(func.sum(
            case([(TI.state == State.FAILED, 1)], else_=0)), 0),
        func.coalesce(func.sum(
            case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0),
        func.count(TI.task_id),
    )
    .filter(
        TI.dag_id == ti.dag_id,
        TI.task_id.in_(ti.task.upstream_task_ids),
        TI.execution_date == ti.execution_date,
        TI.state.in_([
            State.SUCCESS, State.FAILED,
            State.UPSTREAM_FAILED, State.SKIPPED]),
    )
)
successes, skipped, failed, upstream_failed, done = qry.first()
触发规则 上游状态 状态变更为 备注
ALL_SUCCESS upstream_failed !=0 or failed != 0 UPSTREAM_FAILED  
  upstream_failed =0 and failed = 0 and skipped !=0 SKIPPED  
ALL_FAILED successes !=0 or skipped !=0 SKIPPED  
ONE_SUCCESS upstream_done ==True and success = 0 SKIPPED  
ONE_FAILED upstream_done != True and failed ==0 and upstream_failed ==0 SKIPPED  
NONE_FAILED upstream_failed !=0 or failed !=0 UPSTREAM_FAILED 1.10.2增加
  skipped >= upstream SKIPPED  
NONE_SKIPPED skipped!=0 SKIPPED 1.10.3增加

class airflow.ti_deps.DepContext

一个Context基类, 用来维护在TI Context 中需要评估的依赖项(deps), 和配置其行为.

class DepContext(object):
    def __init__(
            self,
            deps=None or set(), # 需要被评估的Deps
            flag_upstream_failed=False, # 是否产品 UPSTREAM_FAILED 状态
            ignore_all_deps=False, # 是否忽略所有Deps
            ignore_depends_on_past=False, # 忽略 DAG 的depends_on_past 参数
            ignore_in_retry_period=False, # 忽略 retry 周期
            ignore_in_reschedule_period=False, # 忽略 reschedule 周期
            ignore_task_deps=False, # 忽略 Task 特有依赖, 如 depends_on_past, trigger_rule
            ignore_ti_state=False # 忽略 TI 以往任务状态
    ):

Executor

定义了和工作平台(yarn celery)交互方式

class airflow.executors.base_executor

Executor 的抽象类

class BaseExecutor(LoggingMixin):
    def __init__(self, parallelism=PARALLELISM):
        self.parallelism = parallelism # 并行task个数
        self.queued_tasks = OrderedDict()
        self.running = {}
        self.event_buffer = {}
    # 启动可以做初始化操作
    def start(self):
      pass

    # 将任务入列
    def queue_command(self, simple_task_instance, command, priority=1, queue=None): 
        self.queued_tasks[key] = (command, priority, queue, simple_task_instance)

    # 同步执行任务
    def sync(self):
      pass
    
    # 异步执行任务
    def execute_async(self,
                      key,
                      command,
                      queue=None,
                      executor_config=None):  # pragma: no cover
        raise NotImplementedError()

    # 由schedulerJob 定期调用
    def heartbeat(self):
      self.execute_async
      self.sync()

class airflow.executors.sequential_executor

class SequentialExecutor(BaseExecutor):
    def __init__(self):
        super(SequentialExecutor, self).__init__()
        self.commands_to_run = []

    def execute_async(self, key, command, queue=None, executor_config=None):
        # one by one 执行不支持并发. 交给同步方法去处理
        self.commands_to_run.append((key, command,))

    def sync(self):
        for key, command in self.commands_to_run:
            subprocess.check_call(command, close_fds=True)
        self.commands_to_run = []

    def end(self):
        self.heartbeat()

选择下想了解的内容