Aller au contenu

Démarrer avec PySpark

Spark est une plateforme open-source de traitement de données volumineuses. Au fil des années, Spark s'est imposé comme l'outil de référence pour l'ingénierie de données. Dans ce guide ultime, je vous présente PySpark, l'API Python de Spark.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

Importer PySpark

import pyspark
pyspark.__version__
'3.5.1'

Créer une session Spark

Nous devons importer la classe SparkSession depuis le module pyspark.sql

from pyspark.sql import SparkSession
# Créer une session Spark
spark = SparkSession.builder \
    .appName("Get Started") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 17:51:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/12 17:51:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Créer un Dataframe

Nous allons préparer une petit dataset, la liste des colonnes et les enregistrements comme liste de tuples

columns = ["id", "product_name", "product_category", "amount", "quantity"]
data = [
    (123, "Product1", "Category1", 45.67, 8),
    (124, "Product2", "Category2", 78.23, 3),
    (125, "Product3", "Category3", 32.45, 6),
    (126, "Product4", "Category1", 91.12, 5),
    (127, "Product5", "Category2", 64.78, 7)
]

Nous allons créer le dataframe à partir du dataset

df = spark.createDataFrame(data, schema=columns)

Afficher le DataFrame

On fait appel à la méthode .show() pour afficher le dataframe.

df.show()
                                                                                

+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|123|    Product1|       Category1| 45.67|       8|
|124|    Product2|       Category2| 78.23|       3|
|125|    Product3|       Category3| 32.45|       6|
|126|    Product4|       Category1| 91.12|       5|
|127|    Product5|       Category2| 64.78|       7|
+---+------------+----------------+------+--------+


Sélection

df.select("id").show()
+---+
| id|
+---+
|123|
|124|
|125|
|126|
|127|
+---+


Tri

df.sort("quantity").show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|124|    Product2|       Category2| 78.23|       3|
|126|    Product4|       Category1| 91.12|       5|
|125|    Product3|       Category3| 32.45|       6|
|127|    Product5|       Category2| 64.78|       7|
|123|    Product1|       Category1| 45.67|       8|
+---+------------+----------------+------+--------+


Filtrage

df.filter("product_category = 'Category1'").show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|123|    Product1|       Category1| 45.67|       8|
|126|    Product4|       Category1| 91.12|       5|
+---+------------+----------------+------+--------+


Il existe cette synthaxe

df.filter(df["product_category"] == "Category1").show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|123|    Product1|       Category1| 45.67|       8|
|126|    Product4|       Category1| 91.12|       5|
+---+------------+----------------+------+--------+


Agrégation

agg_df = df.groupby("product_category").agg({"id": "count", "amount": "sum", "quantity": "sum"})

agg_df.show()
+----------------+------------------+-------------+---------+
|product_category|       sum(amount)|sum(quantity)|count(id)|
+----------------+------------------+-------------+---------+
|       Category1|136.79000000000002|           13|        2|
|       Category2|            143.01|           10|        2|
|       Category3|             32.45|            6|        1|
+----------------+------------------+-------------+---------+


Manipulation avec SQL

Spark offre la possibilité de manipuler les données avec SQL, c'est l'une de ses forces. Les professionnels des données qui utilisent SQL au quotidien s'y retrouvent assez facilement.

df.createOrReplaceTempView("lu_products")

sql_query = "SELECT * FROM lu_products WHERE id > 125"

result_df = spark.sql(sql_query)
result_df.show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|126|    Product4|       Category1| 91.12|       5|
|127|    Product5|       Category2| 64.78|       7|
+---+------------+----------------+------+--------+


Fermer la session Spark

Une fois notre travail terminé, nous devons fermer la session.

spark.stop()

Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.