Aller au contenu

Interagir avec une base de données#

Dans cette section, nous explorerons la capacité de Spark à se connecter à une base de données relationnelle à l'aide de JDBC, en mettant en lumière l'exemple de Postgres.

Pré-requis#

Afin d'accéder à la base de données Postgres, nous aurons besoin du driver Postgres. Rendez-vous sur la page de téléchargement Download pgJDBC. Une fois le téléchargement terminé, placer-le dans un dossier de votre choix facile d'accès depuis le notebook.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

Importer PySpark#

import pyspark
from pyspark.sql import SparkSession
pyspark.__version__
'3.5.1'

Créer une session Spark#

# Créer une session Spark
spark = SparkSession.builder \
    .appName("Work with PostgreSQL") \
    .config("spark.driver.extraClassPath", "/home/joekakone/spark/drivers/postgresql-42.7.3.jar") \
    .config("spark.executor.extraClassPath", "/home/joekakone/spark/drivers/postgresql-42.7.3.jar") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 17:32:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/12 17:32:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Database Config#

Faites ip route dans le terminal pour obtenir l'adresse

joekakone@UTILISA-GII29BR:~$ ip route
default via 172.31.16.1 dev eth0 proto kernel
172.31.16.0/20 dev eth0 proto kernel scope link src 172.31.27.239
## Database Infos & Credentials
HOST = "172.31.16.1" # Windows localhost (Ubuntu WSL)
PORT = "5432"
DATABASE = "postgres"
USERNAME = "postgres"
PASSWORD = "admin"

Connection String#

postgresql_url = f"jdbc:postgresql://{HOST}:{PORT}/{DATABASE}"
postgresql_long_url = f"jdbc:postgresql://{HOST}:{PORT}/{DATABASE}?user={USERNAME}&password={PASSWORD}"
connection_properties = {
    # "user": USERNAME,
    # "password": PASSWORD,
    "driver": "org.postgresql.Driver"
}
print(postgresql_url)
jdbc:postgresql://172.31.16.1:5432/postgres

Nous devons importer la classe SparkSession depuis le module pyspark.sql

Importer une table#

table_name = "public.covid19_daily_kpi"

df = spark.read.jdbc(url=postgresql_long_url, table=table_name, properties=connection_properties)

df.show()
                                                                                

+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|        dt|      country|            latitude|           longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|7.946500000000000000|-1.02320000000000...|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|9.945600000000000000|-9.69660000000000...|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|6.428055000000000500|-9.42949900000000...|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|17.57069200000000...|-3.99616600000000...|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|17.60778900000000...|8.081666000000000000|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|9.082000000000000000|8.675300000000000000|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|14.49740000000000...|-14.4524000000000...|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|8.619500000000000000|0.824800000000000000|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
only showing top 20 rows


table_name = "public.covid19_daily_kpi"

df = spark.read.format("jdbc") \
    .option("url", postgresql_long_url) \
    .option("dbtable", table_name) \
    .load()

df.show()
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|        dt|      country|            latitude|           longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|7.946500000000000000|-1.02320000000000...|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|9.945600000000000000|-9.69660000000000...|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|6.428055000000000500|-9.42949900000000...|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|17.57069200000000...|-3.99616600000000...|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|17.60778900000000...|8.081666000000000000|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|9.082000000000000000|8.675300000000000000|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|14.49740000000000...|-14.4524000000000...|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|8.619500000000000000|0.824800000000000000|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
only showing top 20 rows


A partir d'une requête SQL#

sql_query = "(select * from public.covid19_daily_kpi) as sq"

df = spark.read.format("jdbc") \
    .option("url", postgresql_url) \
    .option("dbtable", sql_query) \
    .option("user", USERNAME) \
    .option("password", PASSWORD) \
    .load()

df.show()
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|        dt|      country|            latitude|           longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|7.946500000000000000|-1.02320000000000...|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|9.945600000000000000|-9.69660000000000...|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|6.428055000000000500|-9.42949900000000...|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|17.57069200000000...|-3.99616600000000...|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|17.60778900000000...|8.081666000000000000|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|9.082000000000000000|8.675300000000000000|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|14.49740000000000...|-14.4524000000000...|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|8.619500000000000000|0.824800000000000000|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|9.307700000000000000|2.315800000000000000|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|16.53880000000000...|-23.0418000000000...|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|13.44320000000000...|-15.3101000000000...|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+
only showing top 20 rows


Exporter vers une base de données#

Uen fois vos mainupulations terminées, Spark offre la possibilité d'exporter les données dans une table dans la base de données pour un usage ultérieur.

destination_table = "public.saprk_table"

colums_types = """
dt            date,
country       varchar(200),
latitude      numeric,
longitude     numeric,
confirmed     integer,
deaths        integer,
recovered     integer,
active        integer,
load_datetime timestamp
"""

df.write \
    .option("createTableColumnTypes", colums_types) \
    .jdbc(postgresql_long_url, destination_table)

Pour vérifier que l'exportation a été bien effectuée, on va importer la table

df = spark.read.format("jdbc") \
    .option("url", postgresql_long_url) \
    .option("dbtable", destination_table) \
    .load()

df.show()
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+
|        dt|      country|latitude|longitude|confirmed|deaths|recovered|active|       load_datetime|
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+
|2021-06-18|        Benin|       9|        2|     8140|   103|     7979|    58|2023-06-18 22:56:...|
|2021-06-18| Burkina Faso|      12|       -2|    13460|   167|    13287|     6|2023-06-18 22:56:...|
|2021-06-18|   Cabo Verde|      17|      -23|    31858|   280|    30796|   782|2023-06-18 22:56:...|
|2021-06-18|Cote d'Ivoire|       8|       -6|    47973|   306|    47346|   321|2023-06-18 22:56:...|
|2021-06-18|       Gambia|      13|      -15|     6024|   181|     5827|    16|2023-06-18 22:56:...|
|2021-06-18|        Ghana|       8|       -1|    94824|   790|    92806|  1228|2023-06-18 22:56:...|
|2021-06-18|       Guinea|      10|      -10|    23431|   167|    21488|  1776|2023-06-18 22:56:...|
|2021-06-18|Guinea-Bissau|      12|      -15|     3819|    69|     3553|   197|2023-06-18 22:56:...|
|2021-06-18|      Liberia|       6|       -9|     2729|    95|     2105|   529|2023-06-18 22:56:...|
|2021-06-18|         Mali|      18|       -4|    14364|   523|    10001|  3840|2023-06-18 22:56:...|
|2021-06-18|        Niger|      18|        8|     5457|   193|     5178|    86|2023-06-18 22:56:...|
|2021-06-18|      Nigeria|       9|        9|   167142|  2117|   163535|  1490|2023-06-18 22:56:...|
|2021-06-18|      Senegal|      14|      -14|    42206|  1158|    40707|   341|2023-06-18 22:56:...|
|2021-06-18| Sierra Leone|       8|      -12|     4553|    82|     3208|  1263|2023-06-18 22:56:...|
|2021-06-18|         Togo|       9|        1|    13682|   127|    13334|   221|2023-06-18 22:56:...|
|2021-06-21|        Benin|       9|        2|     8140|   103|     7979|    58|2023-06-21 04:42:...|
|2021-06-21| Burkina Faso|      12|       -2|    13469|   167|    13293|     9|2023-06-21 04:42:...|
|2021-06-21|   Cabo Verde|      17|      -23|    32002|   283|    30988|   731|2023-06-21 04:42:...|
|2021-06-21|Cote d'Ivoire|       8|       -6|    48044|   308|    47445|   291|2023-06-21 04:42:...|
|2021-06-21|       Gambia|      13|      -15|     6024|   181|     5827|    16|2023-06-21 04:42:...|
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+
only showing top 20 rows


Fermer la session Spark#

Une fois notre travail terminé, nous devons fermer la session.

spark.stop()

Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.