Aller au contenu

Import et Export

Spark peut lire des données depuis des sources externes tels que les fichiers CSV, JSON, Parquet ou encore HDFS.

Pré-requis

Afin d'accéder à la base de données Postgres, nous aurons besoin du driver Postgres. Rendez-vous sur la page de téléchargement Download pgJDBC.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

Importer PySpark

import os
import urllib
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

Créer une session Spark

Nous devons importer la classe SparkSession depuis le module pyspark.sql

# Créer une session Spark
spark = SparkSession.builder \
    .appName("Get Started") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 17:34:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/12 17:34:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Importer depuis un fichier CSV

URL = "https://raw.githubusercontent.com/joekakone/datasets/master/datasets/StudentsPerformance.csv"
FILENAME = "StudentsPerformance.csv"

urllib.request.urlretrieve(url=URL, filename=FILENAME)
('StudentsPerformance.csv', <http.client.HTTPMessage at 0x7fac0fe66aa0>)

Nous allons préparer une petit dataset, la liste des colonnes et les enregistrements comme liste de tuples

df = spark.read.csv(path=FILENAME, header=True, inferSchema=True)
df.show(10)
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|           78|           75|
|female|       group B|         associate's degree|    standard|                   none|        71|           83|           78|
|female|       group B|               some college|    standard|              completed|        88|           95|           92|
|  male|       group B|               some college|free/reduced|                   none|        40|           43|           39|
|  male|       group D|                high school|free/reduced|              completed|        64|           64|           67|
|female|       group B|                high school|free/reduced|                   none|        38|           60|           50|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
only showing top 10 rows


Nous allons créer le dataframe à partir du dataset

Afficher le schéma

Le schéma d'un DataFrame en PySpark est une représentation tabulaire qui expose la structure des données, incluant les noms des colonnes, les types de données associés à chaque colonne, et d'autres informations pertinentes. La méthode printSchema permet d'afficher le schéma.

df.printSchema()
root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)


Lire un fichier à partir d'une URL

from pyspark import SparkFiles

sc = spark.sparkContext

sc.addFile(URL)

path = SparkFiles.get(os.path.split(URL)[-1])

df = spark.read.csv("file://" + path, header=True, inferSchema=True, sep=",")

df.show()
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|           78|           75|
|female|       group B|         associate's degree|    standard|                   none|        71|           83|           78|
|female|       group B|               some college|    standard|              completed|        88|           95|           92|
|  male|       group B|               some college|free/reduced|                   none|        40|           43|           39|
|  male|       group D|                high school|free/reduced|              completed|        64|           64|           67|
|female|       group B|                high school|free/reduced|                   none|        38|           60|           50|
|  male|       group C|         associate's degree|    standard|                   none|        58|           54|           52|
|  male|       group D|         associate's degree|    standard|                   none|        40|           52|           43|
|female|       group B|                high school|    standard|                   none|        65|           81|           73|
|  male|       group A|               some college|    standard|              completed|        78|           72|           70|
|female|       group A|            master's degree|    standard|                   none|        50|           53|           58|
|female|       group C|           some high school|    standard|                   none|        69|           75|           78|
|  male|       group C|                high school|    standard|                   none|        88|           89|           86|
|female|       group B|           some high school|free/reduced|                   none|        18|           32|           28|
|  male|       group C|            master's degree|free/reduced|              completed|        46|           42|           46|
|female|       group C|         associate's degree|free/reduced|                   none|        54|           58|           61|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
only showing top 20 rows


Exporter vers un fichier CSV

On fait appel à la méthode .show() pour afficher le dataframe.

OUTPUT_FILE = "StudentsPerformanceOutput.csv"

df.write.csv(path=OUTPUT_FILE, mode="overwrite")

Exporter vers un fichier Parquet

Apache Parquet est un format de fichier de données open source en colonnes, conçu pour stocker et récupérer des données avec une grande efficacité. Pour en savoir plus, merci de consulter ce lien Parquet, qu'est-ce que c'est ?

OUTPUT_FILE = "StudentsPerformance.parquet"

df.write.parquet(path=OUTPUT_FILE, mode="overwrite", partitionBy="race/ethnicity")
                                                                                

Fermer la session Spark

Une fois notre travail terminé, nous devons fermer la session.

spark.stop()

Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.