Implémenter un ETL avec Apache Airflow
Lorsqu'il s'agit d'orchestrer des flux de travail data de plus en plus complexes avec Python, Apache Airflow est la solution qu'il vous faut. Il permet de créer des interfaces web pour les modèles de machine learning.
Apache Airflow est une plateforme qui permet
Airflow a été inialement développé par Airbnb en octobre 2014 pour orchestrer leurs flux de travail. Depuis mars 2016, Airflow a été repris par la fondation Apache.
Apache Airflow permet de créer des workflow, de plannifier leurs exécution et d'assurer leur monitoring.
Les avantages de Airflow:
- Une plateforme tout en un (conception, planification et monitoring)
- Gestion des credentials des bases de données (la plupart des sources de données)
- Gestion des utilisateurs
- Utilisation des packages Python
- Traçabilité des exécutions et Facilité de débogage
Les composantes
Apache Airflow est composée de trois composantes
Les composants d'Airflow
Airflow Component | Container |
---|---|
Web Server | apache-airflow-airflow-webserver |
Scheduler | apache-airflow-airflow-scheduler |
Executor | apache-airflow-airflow-worker , apache-airflow-airflow-triggerer |
Queue | apache-airflow-redis |
Workers | apache-airflow-airflow-worker |
Metadatabase | apache-airflow-postgres |
Direct Acyclic Graph
Un DAG est écrit en Python, il décrit les étapes d'un workflow.
À quoi ressemble un workflow
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
RUN_DATE = "{{ execution_date }}" # Scheduled Date
# Define default_args and DAG
default_args = {
"owner": "admin",
"start_date": datetime(2024, 5, 7),
"retries": 0,
"retry_delay": timedelta(minutes=15),
}
dag = DAG(
dag_id="my_first_dag",
default_args=default_args,
description="First DAG,
schedule_interval="0 6 * * *",
tags=[""],
catchup=False,
)
start_task = DummyOperator(task_id="start", dag=dag)
fecth_kyc = PostgresOperator(
task_id="fecth_fact_sales",
postgres_conn_id="sales_db,
sql=f"select public.fn_fetch_sales('{RUN_DATE}'::date)",
dag=dag,
)
end_task = DummyOperator(task_id="end", dag=dag)
# Flow
start_task >> fecth_kyc >> end_task
Dans cette formation, je vous montre comment créer un ETL Apache Airflow dans le cadre de l'intégration de données. Rendez-vous pour installer Airflow.