Aller au contenu

Implémenter un ETL avec Apache Airflow


Lorsqu'il s'agit d'orchestrer des flux de travail data de plus en plus complexes avec Python, Apache Airflow est la solution qu'il vous faut. Il permet de créer des interfaces web pour les modèles de machine learning.

Apache Airflow est une plateforme qui permet

Airflow a été inialement développé par Airbnb en octobre 2014 pour orchestrer leurs flux de travail. Depuis mars 2016, Airflow a été repris par la fondation Apache.

Apache Airflow permet de créer des workflow, de plannifier leurs exécution et d'assurer leur monitoring.

Les avantages de Airflow:

  • Une plateforme tout en un (conception, planification et monitoring)
  • Gestion des credentials des bases de données (la plupart des sources de données)
  • Gestion des utilisateurs
  • Utilisation des packages Python
  • Traçabilité des exécutions et Facilité de débogage

Les composantes

Apache Airflow est composée de trois composantes

Les composants d'Airflow

Airflow Component Container
Web Server apache-airflow-airflow-webserver
Scheduler apache-airflow-airflow-scheduler
Executor apache-airflow-airflow-worker, apache-airflow-airflow-triggerer
Queue apache-airflow-redis
Workers apache-airflow-airflow-worker
Metadatabase apache-airflow-postgres

Direct Acyclic Graph

Un DAG est écrit en Python, il décrit les étapes d'un workflow.

À quoi ressemble un workflow

DAG Example
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator

RUN_DATE = "{{ execution_date }}" # Scheduled Date

# Define default_args and DAG
default_args = {
    "owner": "admin",
    "start_date": datetime(2024, 5, 7),
    "retries": 0,
    "retry_delay": timedelta(minutes=15),
}

dag = DAG(
    dag_id="my_first_dag",
    default_args=default_args,
    description="First DAG,
    schedule_interval="0 6 * * *",
    tags=[""],
    catchup=False,
)

start_task = DummyOperator(task_id="start", dag=dag)

fecth_kyc = PostgresOperator(
    task_id="fecth_fact_sales",
    postgres_conn_id="sales_db,
    sql=f"select public.fn_fetch_sales('{RUN_DATE}'::date)",
    dag=dag,
)

end_task = DummyOperator(task_id="end", dag=dag)

# Flow
start_task >> fecth_kyc >> end_task

Dans cette formation, je vous montre comment créer un ETL Apache Airflow dans le cadre de l'intégration de données. Rendez-vous pour installer Airflow.

Partagez sur les réseaux sociaux

Commentaires