How to create “trigger only” python dags and pass arguments from front end/client for Apache airflow
At times while working with Apache airflow. As the need arises where every time the dag is triggered it needs to be given a new parameter to work with. I find it pretty useful while doing job id based processing where when a user uploads say a csv and there is a job id associated with it and you need to trigger a dag to run the task for this particular set of inputs in current job_id. I use this as alternative to Celery tasks.
So you need 2 things for this.
- A Dag which is scheduled to None. To avoid it being triggered otherwise
- A apache-airflow-client pypi package / Front end code or Postman
Lets look the dag first
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'techkamar',
'depends_on_past': False,
'email': ['airflow@example.com'],
'start_date': days_ago(2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# main business logic here
def run(**kwargs):
params = kwargs['params']
print("Parameters : "+str(params))
dag = DAG(
'test',
default_args=default_args,
description='Test Dag',
schedule_interval=None,
)
# define the first task
t1 = PythonOperator(
task_id='core_function',
python_callable= run,
dag=dag,
provide_context=True
)
t1
In the above code block,
you can see that the variable params in the method “run” is the one we will be sending from python-client / Browser.
Now we will try with python client and then with browser or postman
first lets install it.
pip install apache-airflow-client
Then lets create a client.py file which can be run to trigger this dag through python
import time
from airflow_client import client
from airflow_client.client.api import dag_api
from airflow_client.client.api import dag_run_api
from airflow_client.client.model.dag_collection import DAGCollection
from airflow_client.client.model.error import Error
from airflow_client.client.model.dag_run import DAGRun
from pprint import pprint
# Defining the host is optional and defaults to http://localhost/api/v1
# See configuration.py for a list of all supported configuration parameters.
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure HTTP basic authorization: Basic
configuration = client.Configuration(
host = "http://localhost:8080/api/v1",
username = 'admin',
password = 'admin'
)
# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = dag_run_api.DAGRunApi(api_client)
dag_id = "test" # str | The DAG ID.
dag_run = DAGRun(
dag_run_id="a8f079b5-de37-4e77-9a56-e08f3081edce",
conf={"job_id":"A36NPQE"},
) # DAGRun |
# example passing only required values which don't have defaults set
try:
# Trigger a new DAG run
api_response = api_instance.post_dag_run(dag_id, dag_run)
pprint(api_response)
except client.ApiException as e:
print("Exception when calling DAGRunApi->post_dag_run: %s\n" % e)
Now in the above file you can see that we are calling the test dag with parameters defined in conf variable.
So when the dag is executed it will print
Parameters : {"job_id":"A36NPQE"}
Now to learn how to do the same with REST API. You can follow this article which helped me.
https://brocktibert.com/post/trigger-airflow-dags-via-the-rest-api/