What is DAG? What is the main difference between DAG and pipeline?
If you’ve previously visited our blog then you couldn’t have missed “Apache Airflow – Start your journey as Data Engineer and Data Scientist”. You probably already know what is meaning of the abbreviation DAG but let’s explain again.
DAG (Directed Acyclic Graph) is a data pipeline that contains one or more tasks that don’t have loops between them.
Now that you know what DAG is, let me show you how to write your first Directed Acyclic Graph following all best practices and become a true DAG master! 🙂
The timezone in Airflow and what can go wrong with them
Timezones in Airflow are set up to UTC by default thus all times you observe in Airflow Web UI are in UTC. It is highly recommended not to change it.
Why is this so important?
Dealing with time zones, in general, can become a real nightmare if they are not set correctly. Understanding how timezones in Airflow work is important since you may want to schedule your DAGs according to your local time zone, which can lead to surprises when DST (Daylight Saving Time) happens. You should be able to trigger your DAGs at the expected time no matter which time zone is used.
How to write DAGs following all best practices
- Set up default_args in DAGs
If you have to apply settings, arguments, or information to all your tasks, then a best practice and recommendation is to avoid top-level code which is not part of your DAG and set up default_args. You can create a dictionary with all arguments like so:
default_args ={
'owner': 'ITGix Data Engineers',
'email_on_failure': False,
'retries': 2
}
- Versioning of the DAG in the DAG’s name
As for now, there is no other way for DAG versioning. For the time being it’s good to put a versioning suffix in the DAG name e.g. ‘itgix_dag_v_1_0_0’.
When you update your DAG you can change the DAG’s name and therefore its version. This will allow you to easily see which DAG version has been triggered.
- Set up schedule_interval and start_data for DAG
You should specify the argument schedule_interval for each DAG object. It defines the frequency of your DAG being triggered (daily, monthly and etc.). Another crucial argument for each DAG object is the start_date. It is the date when your DAG starts to be scheduled. Keep in mind that it is the most important part especially if the next argument’s catchup is equal to TRUE.
Let’s jump to the next arguments and see for ourselves why those arguments are essential.
- Set up parameter Catchup=false – why is it so important to set up arguments of each DAG
Let’s imagine that your DAG has a configured start_date(2021, 4, 1) and you trigger your DAG run for the first time on 01.04.2022.
By default in Airflow, catchup is set up to TRUE and when you trigger DAG for the first time, Airflow will trigger DAG RUNs for one year (from 01.04.2021 to the current date). That’s the reason why the catchup parameter is so important to be set up for each DAG object equal to FALSE.
Keep this in mind and let’s move to the next arguments.
- Add Tags to your DAG
The tag argument is very useful and I recommend setting it up for each DAG object especially if you have many teams or you’d like to fast and easily filter your DAGs in Airflow Web UI.
For argument tag you can specify a list of tags: tags=[“data_science”, “data”] .
- Add Description of DAG
Another best practice is adding a meaningful description to your DAGs to best describe what your DAG does. The description argument can be: description=”DAG is used to store data”.
- Set up argument dagrun_timeout
Some of the most important arguments are dagrun_timeout. Let’s imagine that your DAG Run takes around 5 minutes for example. It is a best practice to mark your DAG Run as a failure if it is not complete after 6 or 7 minutes.
How you can do it? The answer is with argument dagrun_timeout like dagrun_timeout=timedelta(minutes=6).
Otherwise, if one of your tasks is stuck then your DAG will run forever.
- How to configure dependency between tasks in one DAG?
Let’s talk about how you can configure dependencies between tasks in a DAG. You can use set_upstream() and set_downstream() functions, or you can use << and >> operators. The method you choose is a matter of personal preference, but for readability, it’s best practice to choose one method and stick with it.
For example, instead of mixing methods like this:
task_1.set_downstream(task_2)
task_3.set_upstream(task_2)
task_3 >> task_4
Try to be consistent with something like this:
task_1 >> task_2 >> [task_3, task_4]
Finally, you can say that your first DAG is ready to be run in Apache Airflow. The full DAG’s code in a python file can be found below:
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args ={
'owner': 'ITGix Data Engineers',
'email_on_failure': False,
'retries': 2
}
def clean_a():
print("cleaning with ml a")
def clean_b():
print("cleaning with ml a")
def clean_c():
print("cleaning with ml a")
def ml_a():
clean_a()
print("process with ml a")
def ml_b():
clean_b()
print("process with ml b")
def ml_c():
clean_b()
print("process with ml c")
with DAG('itgix_dag_v_1_0_0', schedule_interval='@daily',
start_date=datetime(2022, 4, 1), default_args=default_args,
catchup=False, tags=["data_science", "data"],
description="DAG is used to store data",
dagrun_timeout=timedelta(minutes=6)) as dag:
extract = BashOperator(
task_id='extract',
bash_command='echo "this commands extract my data"'
)
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
)
process_ml_b = PythonOperator(
task_id='process_ml_b',
python_callable=ml_b
)
process_ml_c = PythonOperator(
task_id='process_ml_c',
python_callable=ml_c
)
store = DummyOperator(
task_id='store_data'
)
extract >> [process_ml_a, process_ml_b, process_ml_c] >> store
Should we follow the principle “One task defines One operator”? And why is that so important?
Let’s imagine that you define two tasks with one operator as you may see below:
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
task_id='process_ml_b',
python_callable=ml_b
)
In this case, if for example task ‘process_ml_b’ didn’t work as expected you will retry that task even if ‘process_ml_a’ has been successfully executed.
If you follow the rule (see below) and task ‘process_ml_b’ didn’t work as expected you can only retry that particular task and easily troubleshoot it.
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
)
process_ml_a = PythonOperator(
task_id='process_ml_b',
python_callable=ml_b
)
How to share data between tasks?
Of course, sometimes you need to share data from one task to another and the first question in your mind is “How can I share data between tasks?”
The short answer is: with Xcoms but you should know some tips.
XComs is used for “cross-communication” and allows you to exchange small amounts of data between tasks. As Xcoms uses Metastore to save your data it has some limits.
The limit of data depends on what kind of database is used for Metastore:
- SQLite limit of data amount – 2 GB;
- PostgreSQL limit of data amount – 1 GB;
- MySQL limit of data amount – 64 kB;
Don’t use subDAGs, use only TaskGroups
If you want to merge two or more tasks on the same level in one task you can use subDAGs. That was often used in the past however it is highly recommended to not use it at all. There are three reasons behind this
- Deadlocks: you can not run any tasks into your Airflow instance
- Complexity: the process of setting up subDAGs
- Sequential executor: it doesn’t matter what kind of executor you use
In Airflow 2.0 you can use TaskGroup to combine two or more tasks on the same level in one task. Below you can find DAG that uses TaskGroup to combine three tasks into one called “processing_tasks”:
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
default_args ={
'owner': 'ITGix Data Engineers',
'email_on_failure': False,
'retries': 2
}
def clean_a():
print("cleaning with ml a")
def clean_b():
print("cleaning with ml a")
def clean_c():
print("cleaning with ml a")
def ml_a():
clean_a()
print("process with ml a")
def ml_b():
clean_b()
print("process with ml b")
def ml_c():
clean_b()
print("process with ml c")
with DAG('itgix_dag_v_1_0_0', schedule_interval='@daily',
start_date=datetime(2022, 4, 1), default_args=default_args,
catchup=False, tags=["data_science", "data"],
description="DAG is used to store data",
dagrun_timeout=timedelta(minutes=6)) as dag:
extract = BashOperator(
task_id='extract',
bash_command='echo "this commands extract my data"'
)
with TaskGroup('processing_tasks') as processing_tasks:
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
)
process_ml_b = PythonOperator(
task_id='process_ml_b',
python_callable=ml_b
)
process_ml_c = PythonOperator(
task_id='process_ml_c',
python_callable=ml_c
)
store = DummyOperator(
task_id='store_data'
)
extract >> processing_tasks >> store
Conclusion
Keep in mind all principles and best practices and start writing your first DAG! Become a DAG master in your company or just ask us to help you skyrocket your Apache Airflow up high in space.