Aller au contenu

Import et Export#

Spark peut lire des données depuis des sources externes tels que les fichiers CSV, JSON, Parquet ou encore HDFS.

Pré-requis#

Afin d'accéder à la base de données Postgres, nous aurons besoin du driver Postgres. Rendez-vous sur la page de téléchargement Download pgJDBC.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

Importer PySpark#

import os
import urllib
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

Créer une session Spark#

Nous devons importer la classe SparkSession depuis le module pyspark.sql

# Créer une session Spark
spark = SparkSession.builder \
    .appName("Get Started") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 17:34:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/12 17:34:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Importer depuis un fichier CSV#

URL = "https://raw.githubusercontent.com/joekakone/datasets/master/datasets/StudentsPerformance.csv"
FILENAME = "StudentsPerformance.csv"

urllib.request.urlretrieve(url=URL, filename=FILENAME)
('StudentsPerformance.csv', <http.client.HTTPMessage at 0x7fac0fe66aa0>)

Nous allons préparer une petit dataset, la liste des colonnes et les enregistrements comme liste de tuples

df = spark.read.csv(path=FILENAME, header=True, inferSchema=True)
df.show(10)
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|           78|           75|
|female|       group B|         associate's degree|    standard|                   none|        71|           83|           78|
|female|       group B|               some college|    standard|              completed|        88|           95|           92|
|  male|       group B|               some college|free/reduced|                   none|        40|           43|           39|
|  male|       group D|                high school|free/reduced|              completed|        64|           64|           67|
|female|       group B|                high school|free/reduced|                   none|        38|           60|           50|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
only showing top 10 rows


Nous allons créer le dataframe à partir du dataset

Afficher le schéma#

Le schéma d'un DataFrame en PySpark est une représentation tabulaire qui expose la structure des données, incluant les noms des colonnes, les types de données associés à chaque colonne, et d'autres informations pertinentes. La méthode printSchema permet d'afficher le schéma.

df.printSchema()
root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)


Lire un fichier à partir d'une URL#

from pyspark import SparkFiles

sc = spark.sparkContext

sc.addFile(URL)

path = SparkFiles.get(os.path.split(URL)[-1])

df = spark.read.csv("file://" + path, header=True, inferSchema=True, sep=",")

df.show()
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|           78|           75|
|female|       group B|         associate's degree|    standard|                   none|        71|           83|           78|
|female|       group B|               some college|    standard|              completed|        88|           95|           92|
|  male|       group B|               some college|free/reduced|                   none|        40|           43|           39|
|  male|       group D|                high school|free/reduced|              completed|        64|           64|           67|
|female|       group B|                high school|free/reduced|                   none|        38|           60|           50|
|  male|       group C|         associate's degree|    standard|                   none|        58|           54|           52|
|  male|       group D|         associate's degree|    standard|                   none|        40|           52|           43|
|female|       group B|                high school|    standard|                   none|        65|           81|           73|
|  male|       group A|               some college|    standard|              completed|        78|           72|           70|
|female|       group A|            master's degree|    standard|                   none|        50|           53|           58|
|female|       group C|           some high school|    standard|                   none|        69|           75|           78|
|  male|       group C|                high school|    standard|                   none|        88|           89|           86|
|female|       group B|           some high school|free/reduced|                   none|        18|           32|           28|
|  male|       group C|            master's degree|free/reduced|              completed|        46|           42|           46|
|female|       group C|         associate's degree|free/reduced|                   none|        54|           58|           61|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
only showing top 20 rows


Exporter vers un fichier CSV#

On fait appel à la méthode .show() pour afficher le dataframe.

OUTPUT_FILE = "StudentsPerformanceOutput.csv"

df.write.csv(path=OUTPUT_FILE, mode="overwrite")

Exporter vers un fichier Parquet#

Apache Parquet est un format de fichier de données open source en colonnes, conçu pour stocker et récupérer des données avec une grande efficacité. Pour en savoir plus, merci de consulter ce lien Parquet, qu'est-ce que c'est ?

OUTPUT_FILE = "StudentsPerformance.parquet"

df.write.parquet(path=OUTPUT_FILE, mode="overwrite", partitionBy="race/ethnicity")
                                                                                

Fermer la session Spark#

Une fois notre travail terminé, nous devons fermer la session.

spark.stop()

Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.