Aller au contenu

Démarrer avec PySpark#

Spark est une plateforme open-source de traitement de données volumineuses. Au fil des années, Spark s'est imposé comme l'outil de référence pour l'ingénierie de données. Dans ce guide ultime, je vous présente PySpark, l'API Python de Spark.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

Importer PySpark#

import pyspark
pyspark.__version__
'3.5.1'

Créer une session Spark#

Nous devons importer la classe SparkSession depuis le module pyspark.sql

from pyspark.sql import SparkSession
# Créer une session Spark
spark = SparkSession.builder \
    .appName("Get Started") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 17:51:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/12 17:51:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Créer un Dataframe#

Nous allons préparer une petit dataset, la liste des colonnes et les enregistrements comme liste de tuples

columns = ["id", "product_name", "product_category", "amount", "quantity"]
data = [
    (123, "Product1", "Category1", 45.67, 8),
    (124, "Product2", "Category2", 78.23, 3),
    (125, "Product3", "Category3", 32.45, 6),
    (126, "Product4", "Category1", 91.12, 5),
    (127, "Product5", "Category2", 64.78, 7)
]

Nous allons créer le dataframe à partir du dataset

df = spark.createDataFrame(data, schema=columns)

Afficher le DataFrame#

On fait appel à la méthode .show() pour afficher le dataframe.

df.show()
                                                                                

+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|123|    Product1|       Category1| 45.67|       8|
|124|    Product2|       Category2| 78.23|       3|
|125|    Product3|       Category3| 32.45|       6|
|126|    Product4|       Category1| 91.12|       5|
|127|    Product5|       Category2| 64.78|       7|
+---+------------+----------------+------+--------+


Sélection#

df.select("id").show()
+---+
| id|
+---+
|123|
|124|
|125|
|126|
|127|
+---+


Tri#

df.sort("quantity").show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|124|    Product2|       Category2| 78.23|       3|
|126|    Product4|       Category1| 91.12|       5|
|125|    Product3|       Category3| 32.45|       6|
|127|    Product5|       Category2| 64.78|       7|
|123|    Product1|       Category1| 45.67|       8|
+---+------------+----------------+------+--------+


Filtrage#

df.filter("product_category = 'Category1'").show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|123|    Product1|       Category1| 45.67|       8|
|126|    Product4|       Category1| 91.12|       5|
+---+------------+----------------+------+--------+


Il existe cette synthaxe

df.filter(df["product_category"] == "Category1").show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|123|    Product1|       Category1| 45.67|       8|
|126|    Product4|       Category1| 91.12|       5|
+---+------------+----------------+------+--------+


Agrégation#

agg_df = df.groupby("product_category").agg({"id": "count", "amount": "sum", "quantity": "sum"})

agg_df.show()
+----------------+------------------+-------------+---------+
|product_category|       sum(amount)|sum(quantity)|count(id)|
+----------------+------------------+-------------+---------+
|       Category1|136.79000000000002|           13|        2|
|       Category2|            143.01|           10|        2|
|       Category3|             32.45|            6|        1|
+----------------+------------------+-------------+---------+


Manipulation avec SQL#

Spark offre la possibilité de manipuler les données avec SQL, c'est l'une de ses forces. Les professionnels des données qui utilisent SQL au quotidien s'y retrouvent assez facilement.

df.createOrReplaceTempView("lu_products")

sql_query = "SELECT * FROM lu_products WHERE id > 125"

result_df = spark.sql(sql_query)
result_df.show()
+---+------------+----------------+------+--------+
| id|product_name|product_category|amount|quantity|
+---+------------+----------------+------+--------+
|126|    Product4|       Category1| 91.12|       5|
|127|    Product5|       Category2| 64.78|       7|
+---+------------+----------------+------+--------+


Fermer la session Spark#

Une fois notre travail terminé, nous devons fermer la session.

spark.stop()

Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.