Is there a way to create/modify connections through Airflow API

Connection is actually a model which you can use to query and insert a new connection from airflow import settings from airflow.models import Connection conn = Connection( conn_id=conn_id, conn_type=conn_type, host=host, login=login, password=password, port=port ) #create a connection object session = settings.Session() # get the session session.add(conn) session.commit() # it will insert the connection object programmatically.

Airflow structure/organization of Dags and tasks

I use something like this. A project is normally something completely separate or unique. Perhaps DAGs to process files that we receive from a certain client which will be completely unrelated to everything else (almost certainly a separate database schema) I have my operators, hooks, and some helper scripts (delete all Airflow data for a … Read more

How to set up Airflow Send Email?

Setting up SMTP Server for Airflow Email alerts using Gmail: Create an email id from which you want to send alerts about DAG failure or if you want to use EmailOperator. Edit airflow.cfg file to edit the smtp details for the mail server. For demo you can use any gmail account. Create a google App … Read more

Airbnb Airflow vs Apache Nifi [closed]

For a great overview of Airflow and Apache NiFi checkout this reddit post: https://www.reddit.com/r/bigdata/comments/51mgk6/comparing_airbnb_airflow_and_apache_nifi/ For your specific use-case of ingesting Json files, enriching them and routing them to Kafka I believe NiFi is the right tool for the job. A couple of processors you could potentially use, as well as documentation for each, are below: … Read more

Running airflow tasks/dags in parallel

You will need to use LocalExecutor. Check your configs (airflow.cfg), you might be using SequentialExectuor which executes tasks serially. Airflow uses a Backend database to store metadata. Check your airflow.cfg file and look for executor keyword. By default, Airflow uses SequentialExecutor which would execute task sequentially no matter what. So to allow Airflow to run … Read more

How to pass parameter to PythonOperator in Airflow

Pass a dict object to op_kwargs Use the keys to access their value from kwargs dict in your python callable def SendEmail(**kwargs): print(kwargs[‘key1’]) print(kwargs[‘key2’]) msg = MIMEText(“The pipeline for client1 is completed, please check.”) msg[‘Subject’] = “xxxx” msg[‘From’] = “xxxx” …… s = smtplib.SMTP(‘localhost’) s.send_message(msg) s.quit() t5_send_notification = PythonOperator( task_id=’t5_send_notification’, provide_context=True, python_callable=SendEmail, op_kwargs={‘key1’: ‘value1’, ‘key2’: … Read more

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)