How to add new DAGs to Airflow?

I think the reason for this is because you haven’t exported AIRFLOW_HOME. Try doing: AIRFLOW_HOME=”/home/alex/airflow/dags” airflow list_dags. If that’s not working than do two steps export AIRFLOW_HOME=”/home/alex/airflow/dags” airflow list_dags I believe this should work. Give it a go?

How >> operator defines task dependencies in Airflow?

Airflow represents workflows as directed acyclic graphs. A workflow is any number of tasks that have to be executed, either in parallel or sequentially. The “>>” is Airflow syntax for setting a task downstream of another. Diving into the incubator-airflow project repo, models.py in the airflow directory defines the behavior of much of the high … Read more

How to run Spark code in Airflow?

You should be able to use BashOperator. Keeping the rest of your code as is, import required class and system packages: from airflow.operators.bash_operator import BashOperator import os import sys set required paths: os.environ[‘SPARK_HOME’] = ‘/path/to/spark/root’ sys.path.append(os.path.join(os.environ[‘SPARK_HOME’], ‘bin’)) and add operator: spark_task = BashOperator( task_id=’spark_java’, bash_command=’spark-submit –class {{ params.class }} {{ params.jar }}’, params={‘class’: ‘MainClassName’, ‘jar’: … Read more

Store and access password using Apache airflow

You can store the password in a Hook – this will be encrypted so long as you have setup your fernet key. Here is how you can create a connection via the UI: Then: To access this password: from airflow.hooks.base_hook import BaseHook # Deprecated in Airflow 2 connection = BaseHook.get_connection(“username_connection”) password = connection.password # This … Read more

How to limit Airflow to run only one instance of a DAG run at a time?

You’ve put the ‘max_active_runs’: 1 into the default_args parameter and not into the correct spot. max_active_runs is a constructor argument for a DAG and should not be put into the default_args dictionary. Here is an example DAG that shows where you need to move it to: dag_args = { ‘owner’: ‘Owner’, # ‘max_active_runs’: 1, # … Read more

Can’t import Airflow plugins

After struggling with the Airflow documentation and trying some of the answers here without success, I found this approach from astronomer.io. As they point out, building an Airflow Plugin can be confusing and perhaps not the best way to add hooks and operators going forward. Custom hooks and operators are a powerful way to extend … Read more

Airflow backfill clarification

When you change the scheduler toggle to “on” for a DAG, the scheduler will trigger a backfill of all dag run instances for which it has no status recorded, starting with the start_date you specify in your “default_args”. For example: If the start date was “2017-01-21” and you turned on the scheduling toggle at “2017-01-22T00:00:00” … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)