Elementals of Airflow: Part 2

Hands-on Guide: Explore some of the aspects of Airflow’s features that are not significantly explored.

Photo by Seb Mooze on Unsplash

This is the second part of the Airflow series as indicated in the title. I have tried to the features in the articles. Here is a list of articles that I have posted so far:

  1. Templating, Airflow Connections, Airflow Variables, Xcoms
  2. Template Search Path, Running Python Scripts (with bash), Running SQL file, Catchup

Contents

  1. Template Search Path
  2. Running Python Scripts (with bash)
  3. Running SQL file
  4. Catchup

1. Template Search Path

Template Search Path is an argument in Dag definition, that contains mainly a list of folder locations that Airflow will look into before searching for the given location. It’s just a way of telling Airflow that if a file location is encountered first look into the paths provided in .

The path provided in must be the absolute path of the folder where Airflow/Jinja will look for templates.

dag = DAG(
"demo_dag",
default_args = default_arguments,
description = "Demo dag definantion for template_search_path",
template_searchpath = ['/opt/airflow/sql_files'],
schedule_interval = timedelta(hours = 12, minutes = 30),
catchup = False
)

As mentioned, is defined inside Dag definition, and preferably as a list of strings. Here the path ‘/opt/airflow/sql_files’is the absolute path to the folder sql_fileswhere .sql files are stored.

task = PostgresOperator(
task_id = 'run_sql_file',
postgres_conn_id = 'postgres_conn',
sql = 'sql_file.sql'
dag = dag
)

Now this PostgresOperator will run the SQL file ‘sql_file.sql’ by first searching the file in ‘/opt/airflow/sql_files’. That means Airflow will run ‘/opt/airflow/sql_files/sql_file.sql’.

: postgres_conn_id = ‘postgres_conn’ is the name of postgres connection created in the Airflow UI. To know how to create the connection look at Part 1 of this Series.

If you don't want your path to a template to be on your dag file so that everyone does not read it or if you are using the same path in multiple Dags, then it is better to save the template_search_path as Airflow Variable.

sql_folder_path = Variable.get("sql_folder_path")dag = DAG(
"demo_dag",
default_args = default_arguments,
description = "Demo dag definantion for template_search_path",
template_searchpath = [sql_folder_path],
schedule_interval = timedelta(hours = 12, minutes = 30),
catchup = False
)

Here

sql_folder_path = Variable.get("sql_path")

is the value of the Airflow Variable 'sql_path'stored in Airflow

Variable, Airflow UI | Image by Author

Note: To understand about Airflow Variable look at of this series.

Code for task remains the same

task = PostgresOperator(
task_id = 'run_sql_file',
postgres_conn_id = 'postgres_conn',
sql = 'sql_file.sql'
dag = dag
)

2. Running Python Scripts (with bash)

Many automation tasks require to run a python script. It is the most common thing, whether you are running a complex task or just one for your personal project. The goto method for this was always to create a batch file and schedule it to run daily (or any proposed period). But the batch file has its own drawback one being it is not considered a good practice to run when the task is large. So here comes Airflow, with Airflow you can run any python file by creating a task among other tasks and run the scripts in any order.

To run Python Script the best method is to use BashOperator and not PythonOperator, the reason is explained later in this section.

Note: I am talking about running Python Script(File) and not Python Function(def).

task = BashOperator(
task_id = "run_python_script",
bash_command = 'python /opt/airflow/python_files/File.py',
dag = dag
)

You just have to write the command that you use to run a python file in cmd/bash terminal in bash_command argument of BashOperator.

The command: ‘python <absolute_path_to_file>’

You can use PythonOperator to run the python scripts as well but there are a couple of modifications that you need to make and it is kind of more steps to account for.

  1. First, you need to wrap your entire code saved inside the python file into a i.e. convert it into python function because PythonOperator cannot run a python file directly it can only run a .
  2. Next, you need to create this Python file, containing your code inside python callable (mostly function), into a python package. 🙄
  3. Now as the package is created you need to import the Python callable from this package into your main Dag file along with other imports. And other steps can come into picture depending upon the way you are implementing Airflow and your code contents.

So its much better to use BashOperator than PythonOperator to run Python Scripts.

3. Running SQL file

This was one thing that took more time than expected to figure out how to make it work in the best way possible. There are a couple of ways but not all are recommended.

When you need run regular SQL operations, like extracting daily data for your client, populating a derived table from the base table by running SQL scripts, etc. This may be the most appropriate way to do it. Here I am using PostgresOperator (obviously because I’m working with Postgres 🙂). But this applies to all SQL Operator.

Let's understand this with an example, suppose I have a dag that runs a query in my Postgres database and extracts data into a csv file, which will be further used either for analysis, working in python(pandas), etc.

Suppose I have the Airflow directory similar to this structure.

├── dags
│ └── dag_file.py
├── data
│ └── postgres_data.csv
├── plugins
├── sql_files
│ └── sql_file.sql
└── .env

where dag_file.pyis the file which contains the dag, sql_file.sql is the file that contains Postgres query to extract the data and postgres_data.csv is the file where I store my queried data.

My dag looks like this

< import statements >
local_tz = pendulum.timezone('Asia/Kolkata')
default_arguments = {
'owner' : "Devesh",
'depends_on_past' : False,
'start_date': datetime(2020,12,6, tzinfo = local_tz),
# 'retries': 0,
# 'retry_delay' : timedelta(minutes =3)
}
sql_folder_path = Variable.get("sql_path")
postgres_data_path = Variable.get("postgres_data_path")
retention_sql = 'sql_file.sql'
dag = DAG(
"sample_dag_postgres_operator",
default_args = default_arguments,
description = "sample dag to understand PostgresOperator",
template_searchpath = [sql_folder_path],
schedule_interval = timedelta(hours = 11, minutes = 30),
catchup = False
)
def date_range_func(**kwargs):
today_date = date.today()
data_date = today_date - timedelta(days = 1)
month_first_date = today_date.replace(day = 1)
date_from = month_first_date + relativedelta(days = -70)
date_till = data_date
d1 = kwargs["ti"]
d1.xcom_push(key = "date_from", value = date_from)
d1.xcom_push(key = "date_till", value = date_till)
print("date range completed")
date_range_task = PythonOperator(
task_id = "date_range",
provide_context = True,
python_callable = date_range_func,
dag = dag
)
run_postgres_task = PostgresOperator(
task_id="postgres_operator",
provide_context = True,
postgres_conn_id = "postgres_conn",
params= {"schema_table":'schema_name.table_name'},
sql = 'sql_file.sql',
dag=dag
)

A lot of things is going on here (template_search_path, Airflow Variable, XCOM, Airflow Connections)

  • sql_folder_path and postgres_data_path are the path location to sql_file.sql and postgres_data.csv respectively which are stored as Airflow Variable and then called here in dag using .
  • xcom_push is just pushing the date range calculated by PythonOperator which will then be pulled by PostgresOperator inside postgres query.
  • params in PostgresOperator is just an additional step included just to explain how one can parameterize SQL query (just like %s used).

Note: Refer to this post, where Airflow Variable, XCOM and Airflow Connections are explained in detail.

Here there are 2 tasks (date_range_task and run_postgres_task).

  1. date_range_task: This task runs a python function date_range_func which returns a date range (from_date, to_date).
  2. run_postgres_task: This is the task that runs the SQL file sql_file.sql stored inside sql_files folder.

For simplicity, My SQL query only runs a statement with clause, sql_file.sql contains the following query.

copy (select * from {{ params.schema_table }} where date between '{{ task_instance.xcom_pull( task_ids = "date_range", key = "date_from")}}' and '{{ task_instance.xcom_pull(key = "date_till", task_ids = "date_range") }}' where revenue > 7) to 'postgres_data.csv' delimiter ',' csv header;

In this Query,

{{ }} is used which is basically how Airflow uses templating which dynamically inserts value in {{ }}.

Note: To know more about PostgresOperator or params you can read that from The Postgres Operator: All you need to know (you can also read about argument parameter). For more on Jinja templating read that from here.

If for example date_range_task pushes from_date = ‘2020–12–07’ and to_date = ‘2020–12–08’ then the rendered templated query will look like

copy (select * from schema_name.table_name where date between‘2020–12–07’ and ‘2020–12–08’ where revenue > 7) to 'postgres_data.csv' delimiter ',' csv header;

Here in query, I parameterized table name just to show how can one use params to parameterized anything in the query.

4. Catchup

The word defines its purpose, i.e. to match up to things that have happened since last Airflow run. It is one of the most controversial feature of Airflow. Why? Lets know about it first.

This helps Airflow to run alldag runs (tasks that haven't been executed) between the last/latest executed Dag run and the current Dag run. Let’s understand this with an example,

local_tz = pendulum.timezone('Asia/Kolkata')default_arguments = {
'owner' : "owner_name",
'depends_on_past' : False,
'start_date': datetime(2020,1,6, tzinfo = local_tz),
'retries': 2,
'retry_delay' : timedelta(minutes =30)
}
dag = DAG(
"demo dag",
default_args = default_arguments,
description = "demo dag for catchup",
template_searchpath = [sql_path],
schedule_interval = timedelta(minutes = 30),
catchup = False
)

As seen, catchup is defined in the dag definition. Suppose your “demo dag” had last run on 21 January 2020 and for some reason your dag hasn’t run since or have been paused, but on 6 January 2021 you realize that dag wasn’t active then by just adding catchup = True you can make your Dag run all tasks in the last 1 year (6 January 2020 to 6 January 2021).

Now comes the controversial part, from the above example you can figure out the number of tasks Airflow has to run simultaneously in order to catchup (almost thousands). This would either crash something or make process very slow. So it is recommended to keep catchup to .

But if you want to catchup, you can still use the Airflow CLI command: airflow backfill . It is considered the best way to catch up any non triggered Dag Runs.

That’s all for today.
Thank you for the read and staying with me for so long. I am going to be writing more beginner-friendly posts in the future too. For more posts like this, you can follow me on Medium. I welcome feedback and constructive criticism and can be reached on Linkedin.
Thanks for your time. Keep Learning and Keep growing.

Thank You!

Happy Learning :)

Data Scientist | Imagine — Believe — Achieve |

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store