Airflow 2.x
Airflow provides a branching decorator that allows you to return the task_id (or list of task_ids) that should run:
@task.branch(task_id="branch_task")
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "big_task" # run just this one task, skip all else
elif xcom_value >= 3:
return ["small_task", "warn_task"] # run these, skip all else
else:
return None # skip everything
You can also inherit directly from BaseBranchOperator overriding the choose_branch
method, but for simple branching logic the decorator is best.
Airflow 1.x
Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.
The docs describe its use:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.
Code Example
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
If you’re installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don’t have to use a dummy task before joining.