How to create a conditional task in Airflow

Airflow 2.x Airflow provides a branching decorator that allows you to return the task_id (or list of task_ids) that should run: @task.branch(task_id=”branch_task”) def branch_func(ti): xcom_value = int(ti.xcom_pull(task_ids=”start_task”)) if xcom_value >= 5: return “big_task” # run just this one task, skip all else elif xcom_value >= 3: return [“small_task”, “warn_task”] # run these, skip all else … Read more

execution_date in airflow: need to access as a variable

The BashOperator‘s bash_command argument is a template. You can access execution_date in any template as a datetime object using the execution_date variable. In the template, you can use any jinja2 methods to manipulate it. Using the following as your BashOperator bash_command string: # pass in the first of the current month some_command.sh {{ execution_date.replace(day=1) }} … Read more

Apache Airflow or Apache Beam for data processing and job scheduling

The other answers are quite technical and hard to understand. I was in your position before so I’ll explain in simple terms. Airflow can do anything. It has BashOperator and PythonOperator which means it can run any bash script or any Python script. It is a way to organize (setup complicated data pipeline DAGs), schedule, … Read more

Airflow: how to delete a DAG?

Edit 8/27/18 – Airflow 1.10 is now released on PyPI! https://pypi.org/project/apache-airflow/1.10.0/ How to delete a DAG completely We have this feature now in Airflow ≥ 1.10! The PR #2199 (Jira: AIRFLOW-1002) adding DAG removal to Airflow has now been merged which allows fully deleting a DAG’s entries from all of the related tables. The core … Read more

How to remove default example dags in airflow

When you startup airflow, make sure you set: load_examples = False inside your airflow.cfg If you have already started airflow with this not set to false, you can set it to false and run airflow resetdb in the cli (!which will destroy all current dag information!). Alternatively you can go into the airflow_db and manually … Read more

Proper way to create dynamic workflows in Airflow

Here is how I did it with a similar request without any subdags: First create a method that returns whatever values you want def values_function(): return values Next create method that will generate the jobs dynamically: def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = “{{ task_instance.xcom_pull(task_ids=”push_func”) … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)