原则
- 按Minor变化来记录
- 仅记录个人感兴趣的更新内容
- 一般仅记录平台通用功能更新(和特定系统绑定的功能暂无测试)
Airflow 2.2.0 -> 2.2.1, 2021-10-29
- AIP-39: Add (customizable) Timetable class to Airflow for richer scheduling behaviour
- Add pre/post execution hooks
Airflow 2.1.1 -> 2.1.4, 2021-09-18
- Set max tree width to 1200 pixels 配合方块靠右对齐, 对长 task 名就太友好了
Airflow 2.1.0, 2021-05-12
-
增加 PythonVirtualenvDecorator decorator
@task.virtualenv( use_dill=True, system_site_packages=False, requirements=['funcsigs'], ) def extract(): data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict
- 增加 Taskgroup decorator
- !!!增加cross-DAG dependencies view
- Tree、View 支持自动刷新
Airflow 2.0.0~2.0.2, 2020-12-18
新功能:
- the TaskFlow API (AIP-31): 编写DAG的新方式:
from airflow.decorators import dag, task from airflow.utils.dates import days_ago @dag(default_args={'owner': 'airflow'}, schedule_interval=None, start_date=days_ago(2)) # 配置DAG def tutorial_taskflow_api_etl(): @task # Task Extract PyOperator def extract(): return {"1001": 301.27, "1002": 433.21, "1003": 502.22} # Save To XCom @task def transform(order_data_dict: dict) -> dict: # Task Transform PyhtonOpeator total_order_value = 0 for value in order_data_dict.values(): total_order_value += value return {"total_order_value": total_order_value} # Save To XCom @task() def load(total_order_value: float): print("Total order value is: %.2f" % total_order_value) order_data = extract() # 定义 依赖 order_summary = transform(order_data) load(order_summary["total_order_value"]) tutorial_etl_dag = tutorial_taskflow_api_etl()
其中transform 在1.x中的写法:
with DAG( 'tutorial_etl_dag', default_args=default_args, description='ETL DAG tutorial', schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: ...... def transform(**kwargs): ti = kwargs['ti'] extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data') order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push('total_order_value', total_value_json_string) # Save To XCOM transform_task = PythonOperator( # Define PythonOperator task_id='transform', python_callable=transform, ) extract_task >> transform_task >> load_task # Define Dep
- 非实验性质的RESTFul API:API
- Scheduler 性能提升并支持HA!!!
- 要求: Postgres 9.6+ or MySQL 8+
- 调度逻辑:
- 为需要的DAGs创建DAGRUN
- 检查一批DAGRUN:
- 可调度的TaskInstances
- 是否完成
- 选择可调度的TI进行调度(符合各种限制)
- 相关配置项:
- max_dagruns_to_create_per_loop
- max_dagruns_per_loop_to_schedule
- ……
- Smart Sensors:将多个轻量的Sensor整合进一个独立进程,减少开销。
- early-access, 后续可能发生不兼容改动
- 经过Airbnb的测试
- 方法:
- 序列化Sences Task信息到database。
- 集中批量执行。
Airflow 1.10.15
无
Airflow 1.10.14
无
Airflow 1.10.13, 2020-11-25
无
Airflow 1.10.12 发布于 2020-08-25
新功能:
- 新增DateTimeSensor(#9697):
主要特点是 target_time 具有幂等性。在某些情境下比 TimeSensor 和 TimeDeltaSensor 用起来共方便:
- 在执行 backfill 操作时TaskInstance不需要等待某个固定时间间隔(使用TimeDeltaSensor)或者等待到某天的固定时间段(使用TimeSensor) 即可执行.
- 如果一个DAG开始于每天 23:00,但是部分 task 需要延迟到第二天一点执行. TimeSensor 仅对时间进行判断会出现错误(23:00>1:00)
DateTimeSensor( task_id='wait_for_0100', target_time='{ next_execution_date.tomorrow().replace(hour=1) }', dag=dag )
- 新增 AirflowClusterPolicyViolation 异常(#10282):
当 dag 或者 task 不符合定义的 cluster policies 时,可以抛出该错误阻止dag 导入或者 task 执行
def task_must_have_owners(task): if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'): raise AirflowClusterPolicyViolation( 'Task must have non-None non-default owner. Current value: {}'.format(task.owner))
- 允许从 SecretBackends 获取敏感数据(#9645):
目前定义有:
# airflow/airflow/configuration.py sensitive_config_values = { ('core', 'sql_alchemy_conn'), ('core', 'fernet_key'), ('celery', 'broker_url'), ('celery', 'flower_basic_auth'), ('celery', 'result_backend'), # Todo: remove this in Airflow 1.11 ('celery', 'celery_result_backend'), ('atlas', 'password'), ('smtp', 'smtp_password'), ('ldap', 'bind_password'), ('kubernetes', 'git_password'), }
- PostgresHook.insert_rows() 支持 Upsert 功能.#8625
语法
INSERT ... ON CONFLICT DO UPDATE
自 Postgres 9.5 新增. - 支持自定义 XCom 类.#8560
支持自定义序列化和反序列化操作
# 至少复写 serialize_value 和 deserialize_value 两个方法 class CustomXCom(BaseXCom): @staticmethod def serialize_value(_): return "custom_value" @staticmethod def deserialize_value(_): return "custom_value"