Créer un pipeline de données
Dans ce tutoriel, nous allons explorer comment créer un pipeline de données automatisé en utilisant Apache Airflow, un outil puissant pour la gestion des flux de travail. Notre objectif est de récupérer des données depuis GitHub et de les charger dans une base de données Postgres de manière régulière.
Pour notre premier projet, nous allons créer un pipeline qui récupère des données depuis GitHub et les charge dans une base de données Postgres.
Packages
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python import PythonOperator
from utils.first_pipeline import extract_data, transform_data, load_data
DAG
dag = DAG(
dag_id="first_pipeline",
description="First Apache Airflow Pipeline",
start_date=datetime(year=2023, month=6, day=18, hour=6, minute=0),
schedule_interval="@daily",
tags=["demo"]
)
Composants
Start Pipeline
Setup table
setup = PostgresOperator(
task_id='setup',
postgres_conn_id='postgres',
sql=["""create table if not exists covid19_daily_kpi (
dt date,
country varchar(200) not null,
latitude numeric,
longitude numeric,
confirmed integer,
deaths integer,
recovered integer,
active integer,
load_datetime timestamp not null default now()
)"""]
)
Extract data
Transform data
Load data
End Pipeline
Orchestraction
Code complet
Bravo, vous avez créé votre premier pipeline avec Airflow. Dans la prochaine section, nous explorerons en détail chaque composant du pipeline et approfondirons votre compréhension d'Apache Airflow.