Implémenter un ETL avec Apache Airflow
Lorsqu'il s'agit d'orchestrer des flux de travail data de plus en plus complexes avec Python, Apache Airflow est la solution qu'il vous faut. Il permet de créer des interfaces web pour les modèles de machine learning.
Apache Airflow#
Apache Airflow est une plateforme d'orchestration qui permet créer des flux de travail (pipelines de données), plannifier et suivre leurs exécution, à travers une interface de programmation, le langage Python.
Airflow a été inialement développé par Airbnb en octobre 2014 pour orchestrer leurs flux de travail. Depuis mars 2016, Airflow a été repris par la fondation Apache.
Les avantages de Airflow:#
- Une plateforme tout en un (conception, planification et monitoring)
- Gestion des credentials des bases de données (la plupart des sources de données)
- Gestion des utilisateurs
- Utilisation des packages Python
- Traçabilité des exécutions et Facilité de débogage
Les composantes#
Apache Airflow est composée de trois composantes
Les composants d'Airflow#
Airflow Component | Container |
---|---|
Web Server | apache-airflow-airflow-webserver |
Scheduler | apache-airflow-airflow-scheduler |
Executor | apache-airflow-airflow-worker , apache-airflow-airflow-triggerer |
Queue | apache-airflow-redis |
Workers | apache-airflow-airflow-worker |
Metadatabase | apache-airflow-postgres |
Direct Acyclic Graph#
Un DAG est écrit en Python, il décrit les étapes d'un workflow.
À quoi ressemble un workflow#
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
RUN_DATE = "{{ execution_date }}" # Scheduled Date
# Define default_args and DAG
default_args = {
"owner": "admin",
"start_date": datetime(2024, 5, 7),
"retries": 0,
"retry_delay": timedelta(minutes=15),
}
dag = DAG(
dag_id="my_first_dag",
default_args=default_args,
description="First DAG,
schedule_interval="0 6 * * *",
tags=[""],
catchup=False,
)
start_task = DummyOperator(task_id="start", dag=dag)
fecth_sales = PostgresOperator(
task_id="fecth_fact_sales",
postgres_conn_id="sales_db,
sql=f"select public.fn_fetch_sales('{RUN_DATE}'::date)",
dag=dag,
)
end_task = DummyOperator(task_id="end", dag=dag)
# Flow
start_task >> fecth_sales >> end_task
Dans cette formation, je vous montre comment créer un ETL Apache Airflow dans le cadre de l'intégration de données. Rendez-vous pour installer Airflow.