Was ist DAG? Was ist der Hauptunterschied zwischen DAG und Pipeline?
Wenn Sie unseren Blog schon einmal besucht haben, dann haben Sie sicher nicht verpasst "Apache Airflow - Starten Sie Ihre Reise als Data Engineer und Data Scientist". Sie wissen wahrscheinlich schon, was die Abkürzung DAG bedeutet, aber lassen Sie es uns noch einmal erklären.
DAG (gerichteter azyklischer Graph) ist eine Datenpipeline, die eine oder mehrere Aufgaben enthält, die keine Schleifen zwischen ihnen haben.
Nun, da Sie wissen, was ein DAG ist, möchte ich Ihnen zeigen, wie Sie Ihren ersten gerichteten azyklischen Graphen nach allen bewährten Verfahren schreiben und ein echter DAG-Meister werden können 🙂 .
Die Zeitzone in Airflow und was mit ihr schief gehen kann
Die Zeitzonen in Airflow sind standardmäßig auf UTC eingestellt, so dass alle Zeiten, die Sie in Airflow Web UI beobachten, in UTC angegeben sind. Es wird dringend empfohlen, sie nicht zu ändern.
Warum ist das so wichtig?
Der Umgang mit Zeitzonen im Allgemeinen kann zu einem echten Albtraum werden, wenn sie nicht korrekt eingestellt sind. Es ist wichtig zu verstehen, wie die Zeitzonen in Airflow funktionieren, da Sie Ihre DAGs möglicherweise nach Ihrer lokalen Zeitzone planen möchten, was zu Überraschungen führen kann, wenn die Sommerzeit eingeführt wird. Sie sollten in der Lage sein, Ihre DAGs zur erwarteten Zeit auszulösen, unabhängig davon, welche Zeitzone verwendet wird.
Wie man DAGs nach allen bewährten Verfahren schreibt
- default_args in DAGs einrichten
Wenn Sie Einstellungen, Argumente oder Informationen auf alle Ihre Aufgaben anwenden müssen, ist es eine bewährte Praxis und Empfehlung, Top-Level-Code zu vermeiden, der nicht Teil Ihrer DAG ist, und default_args einzurichten. Sie können ein Wörterbuch mit allen Argumenten wie folgt erstellen:
default_args ={
'owner': 'ITGix Data Engineers',
'email_on_failure': False,
'retries': 2
}
- Versionierung der DAG im Namen der DAG
Im Moment gibt es keine andere Möglichkeit für die Versionierung von DAGs. Im Moment ist es gut, ein Versionierungssuffix in den DAG-Namen einzufügen, z. B. "itgix_dag_v_1_0_0".
Wenn Sie Ihre DAG aktualisieren, können Sie den Namen der DAG und damit ihre Version ändern. So können Sie leicht erkennen, welche DAG-Version ausgelöst wurde.
- Einrichten von schedule_interval und start_data für DAG
Sie sollten das Argument schedule_interval für jedes DAG-Objekt angeben. Es bestimmt die Häufigkeit, mit der Ihre DAG ausgelöst wird (täglich, monatlich usw.). Ein weiteres wichtiges Argument für jedes DAG-Objekt ist das start_date. Es ist das Datum, ab dem Ihre DAG geplant werden soll. Beachten Sie, dass dies der wichtigste Teil ist, insbesondere wenn das nächste Argument catchup gleich TRUE ist.
Gehen wir zu den nächsten Argumenten über und sehen wir selbst, warum diese Argumente wesentlich sind.
- Parameter Catchup=false einrichten - warum ist es so wichtig, Argumente für jeden DAG einzurichten?
Nehmen wir an, Ihre DAG hat ein konfiguriertes start_date(2021, 4, 1) und Sie lösen Ihren DAG-Lauf zum ersten Mal am 01.04.2022 aus.
Standardmäßig ist Catchup in Airflow auf TRUE eingestellt und wenn Sie DAG zum ersten Mal auslösen, wird Airflow DAG RUNs für ein Jahr auslösen (vom 01.04.2021 bis zum aktuellen Datum). Aus diesem Grund ist es so wichtig, dass der Parameter catchup für jedes DAG-Objekt auf FALSE gesetzt wird.
Behalten Sie dies im Hinterkopf und lassen Sie uns zu den nächsten Argumenten übergehen.
- Tags zu Ihrer DAG hinzufügen
Das Tag-Argument ist sehr nützlich und ich empfehle, es für jedes DAG-Objekt einzurichten, insbesondere wenn Sie viele Teams haben oder Ihre DAGs in der Airflow Web UI schnell und einfach filtern möchten.
Für das Argument tag können Sie eine Liste von Tags angeben: tags=["data_science", "data"] .
- Beschreibung der DAG hinzufügen
Ein weiteres bewährtes Verfahren ist das Hinzufügen einer aussagekräftigen Beschreibung zu Ihren DAGs, um die Funktion Ihres DAGs bestmöglich zu beschreiben. Das Beschreibungsargument kann sein: description="DAG wird zum Speichern von Daten verwendet".
- Argument dagrun_timeout einrichten
Einige der wichtigsten Argumente sind dagrun_timeout. Nehmen wir an, dass Ihr DAG-Lauf beispielsweise etwa 5 Minuten dauert. Es empfiehlt sich, den DAG-Lauf als Fehlschlag zu markieren, wenn er nach 6 oder 7 Minuten noch nicht abgeschlossen ist.
Wie können Sie das tun? Die Antwort ist mit dem Argument dagrun_timeout wie dagrun_timeout=timedelta(minutes=6).
Andernfalls wird Ihre DAG ewig laufen, wenn eine Ihrer Aufgaben stecken bleibt.
- Wie konfiguriert man Abhängigkeiten zwischen Tasks in einer DAG?
Let’s talk about how you can configure dependencies between tasks in a DAG. You can use set_upstream() and set_downstream() functions, or you can use << and >> operators. The method you choose is a matter of personal preference, but for readability, it’s best practice to choose one method and stick with it.
Zum Beispiel, anstatt Methoden wie diese zu mischen:
task_1.set_downstream(task_2)
task_3.set_upstream(task_2)
task_3 >> task_4
Versuchen Sie, mit so etwas konsequent zu sein:
task_1 >> task_2 >> [task_3, task_4]
Schließlich können Sie sagen, dass Ihre erste DAG bereit ist, in Apache Airflow ausgeführt zu werden. Den vollständigen Code der DAG in einer Python-Datei finden Sie unten:
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args ={
'owner': 'ITGix Data Engineers',
'email_on_failure': False,
'retries': 2
}
def clean_a():
print("cleaning with ml a")
def clean_b():
print("cleaning with ml a")
def clean_c():
print("cleaning with ml a")
def ml_a():
clean_a()
print("process with ml a")
def ml_b():
clean_b()
print("process with ml b")
def ml_c():
clean_b()
print("process with ml c")
with DAG('itgix_dag_v_1_0_0', schedule_interval='@daily',
start_date=datetime(2022, 4, 1), default_args=default_args,
catchup=False, tags=["data_science", "data"],
description="DAG is used to store data",
dagrun_timeout=timedelta(minutes=6)) as dag:
extract = BashOperator(
task_id='extract',
bash_command='echo "this commands extract my data"'
)
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
)
process_ml_b = PythonOperator(
task_id='process_ml_b',
python_callable=ml_b
)
process_ml_c = PythonOperator(
task_id='process_ml_c',
python_callable=ml_c
)
store = DummyOperator(
task_id='store_data'
)
extract >> [process_ml_a, process_ml_b, process_ml_c] >> store
Sollten wir dem Grundsatz "Eine Aufgabe definiert einen Betreiber" folgen? Und warum ist das so wichtig?
Stellen wir uns vor, Sie definieren zwei Aufgaben mit einem Operator, wie Sie unten sehen können:
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
task_id='process_ml_b',
python_callable=ml_b
)
In diesem Fall, wenn z.B. die Aufgabe 'process_ml_b' nicht wie erwartet funktioniert hat, werden Sie diese Aufgabe erneut versuchen, auch wenn 'process_ml_a' erfolgreich ausgeführt wurde.
Wenn Sie die Regel (siehe unten) befolgen und die Aufgabe "process_ml_b" nicht wie erwartet funktioniert hat, können Sie nur diese bestimmte Aufgabe erneut versuchen und das Problem leicht beheben.
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
)
process_ml_a = PythonOperator(
task_id='process_ml_b',
python_callable=ml_b
)
Wie können Daten zwischen Aufgaben ausgetauscht werden?
Natürlich müssen Sie manchmal Daten von einer Aufgabe an eine andere weitergeben, und die erste Frage, die Ihnen in den Sinn kommt, lautet: "Wie kann ich Daten zwischen Aufgaben austauschen?"
Die kurze Antwort lautet: mit Xcoms, aber Sie sollten einige Tipps kennen.
XComs wird für die "Querkommunikation" verwendet und ermöglicht den Austausch kleiner Datenmengen zwischen Aufgaben. Da XComs Metastore zum Speichern Ihrer Daten verwendet, sind ihm einige Grenzen gesetzt.
Die Datenmenge hängt davon ab, welche Art von Datenbank für den Metastore verwendet wird:
- SQLite-Begrenzung der Datenmenge - 2 GB;
- PostgreSQL Limit der Datenmenge - 1 GB;
- MySQL Begrenzung der Datenmenge - 64 kB;
Keine subDAGs verwenden, nur TaskGroups verwenden
Wenn Sie zwei oder mehr Aufgaben auf derselben Ebene in einer Aufgabe zusammenführen möchten, können Sie subDAGs verwenden. Dies wurde in der Vergangenheit häufig verwendet, es wird jedoch dringend empfohlen, dies nicht zu tun. Hierfür gibt es drei Gründe
- Deadlocks: Sie können keine Aufgaben in Ihrer Airflow-Instanz ausführen
- Komplexität: der Prozess der Bildung von subDAGs
- Sequentieller Executor: Es spielt keine Rolle, welche Art von Executor Sie verwenden
In Airflow 2.0 können Sie TaskGroup verwenden, um zwei oder mehr Aufgaben auf der gleichen Ebene in einer Aufgabe zu kombinieren. Nachfolgend finden Sie eine DAG, die TaskGroup verwendet, um drei Aufgaben in einer Aufgabe namens "processing_tasks" zu kombinieren:
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
default_args ={
'owner': 'ITGix Data Engineers',
'email_on_failure': False,
'retries': 2
}
def clean_a():
print("cleaning with ml a")
def clean_b():
print("cleaning with ml a")
def clean_c():
print("cleaning with ml a")
def ml_a():
clean_a()
print("process with ml a")
def ml_b():
clean_b()
print("process with ml b")
def ml_c():
clean_b()
print("process with ml c")
with DAG('itgix_dag_v_1_0_0', schedule_interval='@daily',
start_date=datetime(2022, 4, 1), default_args=default_args,
catchup=False, tags=["data_science", "data"],
description="DAG is used to store data",
dagrun_timeout=timedelta(minutes=6)) as dag:
extract = BashOperator(
task_id='extract',
bash_command='echo "this commands extract my data"'
)
with TaskGroup('processing_tasks') as processing_tasks:
process_ml_a = PythonOperator(
task_id='process_ml_a',
python_callable=ml_a
)
process_ml_b = PythonOperator(
task_id='process_ml_b',
python_callable=ml_b
)
process_ml_c = PythonOperator(
task_id='process_ml_c',
python_callable=ml_c
)
store = DummyOperator(
task_id='store_data'
)
extract >> processing_tasks >> store
Schlussfolgerung
Behalten Sie alle Prinzipien und Best Practices im Hinterkopf und schreiben Sie Ihre erste DAG! Werden Sie zum DAG-Meister in Ihrem Unternehmen oder bitten Sie uns, Ihnen zu helfen, Ihren Apache Airflow in die Höhe zu treiben.