Aller au contenu

Interagir avec une base de données

Dans cette section, nous explorerons la capacité de Spark à se connecter à une base de données relationnelle à l'aide de JDBC, en mettant en lumière l'exemple de Postgres.

Pré-requis

Afin d'accéder à la base de données Postgres, nous aurons besoin du driver Postgres. Rendez-vous sur la page de téléchargement Download pgJDBC. Une fois le téléchargement terminé, placer-le dans un dossier de votre choix facile d'accès depuis le notebook.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

Importer PySpark

import pyspark
from pyspark.sql import SparkSession
pyspark.__version__
'3.5.1'

Créer une session Spark

# Créer une session Spark
spark = SparkSession.builder \
    .appName("Work with PostgreSQL") \
    .config("spark.driver.extraClassPath", "/home/joekakone/spark/drivers/postgresql-42.7.3.jar") \
    .config("spark.executor.extraClassPath", "/home/joekakone/spark/drivers/postgresql-42.7.3.jar") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 17:32:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/12 17:32:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Database Config

Faites ip route dans le terminal pour obtenir l'adresse

joekakone@UTILISA-GII29BR:~$ ip route
default via 172.31.16.1 dev eth0 proto kernel
172.31.16.0/20 dev eth0 proto kernel scope link src 172.31.27.239
## Database Infos & Credentials
HOST = "172.31.16.1" # Windows localhost (Ubuntu WSL)
PORT = "5432"
DATABASE = "postgres"
USERNAME = "postgres"
PASSWORD = "admin"

Connection String

postgresql_url = f"jdbc:postgresql://{HOST}:{PORT}/{DATABASE}"
postgresql_long_url = f"jdbc:postgresql://{HOST}:{PORT}/{DATABASE}?user={USERNAME}&password={PASSWORD}"
connection_properties = {
    # "user": USERNAME,
    # "password": PASSWORD,
    "driver": "org.postgresql.Driver"
}
print(postgresql_url)
jdbc:postgresql://172.31.16.1:5432/postgres

Nous devons importer la classe SparkSession depuis le module pyspark.sql

Importer une table

table_name = "public.covid19_daily_kpi"

df = spark.read.jdbc(url=postgresql_long_url, table=table_name, properties=connection_properties)

df.show()
                                                                                

+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|        dt|      country|            latitude|           longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|7.946500000000000000|-1.02320000000000...|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|9.945600000000000000|-9.69660000000000...|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|6.428055000000000500|-9.42949900000000...|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|17.57069200000000...|-3.99616600000000...|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|17.60778900000000...|8.081666000000000000|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|9.082000000000000000|8.675300000000000000|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|14.49740000000000...|-14.4524000000000...|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|8.619500000000000000|0.824800000000000000|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
only showing top 20 rows


table_name = "public.covid19_daily_kpi"

df = spark.read.format("jdbc") \
    .option("url", postgresql_long_url) \
    .option("dbtable", table_name) \
    .load()

df.show()
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|        dt|      country|            latitude|           longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|7.946500000000000000|-1.02320000000000...|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|9.945600000000000000|-9.69660000000000...|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|6.428055000000000500|-9.42949900000000...|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|17.57069200000000...|-3.99616600000000...|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|17.60778900000000...|8.081666000000000000|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|9.082000000000000000|8.675300000000000000|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|14.49740000000000...|-14.4524000000000...|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|8.619500000000000000|0.824800000000000000|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
only showing top 20 rows


A partir d'une requête SQL

sql_query = "(select * from public.covid19_daily_kpi) as sq"

df = spark.read.format("jdbc") \
    .option("url", postgresql_url) \
    .option("dbtable", sql_query) \
    .option("user", USERNAME) \
    .option("password", PASSWORD) \
    .load()

df.show()
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|        dt|      country|            latitude|           longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|7.946500000000000000|-1.02320000000000...|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|9.945600000000000000|-9.69660000000000...|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|6.428055000000000500|-9.42949900000000...|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|17.57069200000000...|-3.99616600000000...|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|17.60778900000000...|8.081666000000000000|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|9.082000000000000000|8.675300000000000000|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|14.49740000000000...|-14.4524000000000...|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|8.619500000000000000|0.824800000000000000|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
only showing top 20 rows


Exporter vers une base de données

Uen fois vos mainupulations terminées, Spark offre la possibilité d'exporter les données dans une table dans la base de données pour un usage ultérieur.

destination_table = "public.saprk_table"

colums_types = """
dt            date,
country       varchar(200),
latitude      numeric,
longitude     numeric,
confirmed     integer,
deaths        integer,
recovered     integer,
active        integer,
load_datetime timestamp
"""

df.write \
    .option("createTableColumnTypes", colums_types) \
    .jdbc(postgresql_long_url, destination_table)

Pour vérifier que l'exportation a été bien effectuée, on va importer la table

df = spark.read.format("jdbc") \
    .option("url", postgresql_long_url) \
    .option("dbtable", destination_table) \
    .load()

df.show()
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+
|        dt|      country|latitude|longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|       9|        2|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|      12|       -2|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|      17|      -23|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|       8|       -6|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|      13|      -15|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|       8|       -1|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|      10|      -10|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|      12|      -15|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|       6|       -9|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|      18|       -4|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|      18|        8|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|       9|        9|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|      14|      -14|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|       8|      -12|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|       9|        1|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|       9|        2|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|      12|       -2|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|      17|      -23|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|       8|       -6|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|      13|      -15|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+
only showing top 20 rows


Fermer la session Spark

Une fois notre travail terminé, nous devons fermer la session.

spark.stop()

Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.