Aller au contenu

Créer un pipeline de données


Dans ce tutoriel, nous allons explorer comment créer un pipeline de données automatisé en utilisant Apache Airflow, un outil puissant pour la gestion des flux de travail. Notre objectif est de récupérer des données depuis GitHub et de les charger dans une base de données Postgres de manière régulière.

Pour notre premier projet, nous allons créer un pipeline qui récupère des données depuis GitHub et les charge dans une base de données Postgres.

Packages

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python import PythonOperator

from utils.first_pipeline import extract_data, transform_data, load_data

DAG

dag = DAG(
  dag_id="first_pipeline",
  description="First Apache Airflow Pipeline",
  start_date=datetime(year=2023, month=6, day=18, hour=6, minute=0),
  schedule_interval="30 6 * * *", # Everyday at 6:00 AM
  tags=["demo"]
)

Composants

Start Pipeline

First Pipeline
start_pipeline = EmptyOperator(
    task_id="start_pipeline"
)

Setup table

create_table = PostgresOperator(
  task_id='create_table',
  postgres_conn_id='postgres',
  sql=["""create table if not exists covid19_daily_kpi (
    dt date,
    country varchar(200) not null,
    latitude numeric,
    longitude numeric,
    confirmed integer,
    deaths integer,
    recovered integer,
    active integer,
    load_datetime timestamp not null default now()
  )"""]
)

Extract data

extract = PythonOperator(
    task_id = 'extract_data',
    python_callable = extract_data
)

Transform data

transform = PythonOperator(
    task_id = 'transform_data',
    python_callable = transform_data
)  

Load data

load  = PythonOperator(
    task_id = 'load_data',
    python_callable = load_data
)

End Pipeline

end_pipeline = EmptyOperator(
    task_id="end_pipeline"
)

Orchestraction

start_pipeline >> setup >> extract >> transform >> load >> end_pipeline

Code complet

Apache AIrflow DAG
"""
  First DAG
"""

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python import PythonOperator

from utils.first_pipeline import extract_data, transform_data, load_data


with DAG(
  dag_id="first_pipeline",
  description="First Apache Airflow Pipeline",
  start_date=datetime(year=2023, month=6, day=18, hour=6, minute=0),
  schedule_interval="30 6 * * *", # Everyday at 6:00 AM
  tags=["demo"]
) as dag:

  start_pipeline = EmptyOperator(
        task_id='start_pipeline',
    )

  create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='postgres',
    sql=["""create table if not exists covid19_daily_kpi (
      dt date,
      country varchar(200) not null,
      latitude numeric,
      longitude numeric,
      confirmed integer,
      deaths integer,
      recovered integer,
      active integer,
      load_datetime timestamp not null default now()
    )"""]
  )

  extract = PythonOperator(
    task_id = 'extract_data',
    python_callable = extract_data
  )

  transform = PythonOperator(
    task_id = 'transform_data',
    python_callable = transform_data
  )  

  load  = PythonOperator(
    task_id = 'load_data',
    python_callable = load_data
  )

  end_pipeline = EmptyOperator(
      task_id='end_pipeline',
  )

  start_pipeline >> create_table >> extract >> transform >> load >> end_pipeline

Bravo, vous avez créé votre premier pipeline avec Airflow. Dans la prochaine section, nous explorerons en détail chaque composant du pipeline et approfondirons votre compréhension d'Apache Airflow.

Partagez sur les réseaux sociaux

Commentaires