Mise en place et experimentation de Spark

Aujourd'hui, nous nous attaquons à un morceau conséquent de la programmation parallèle et distribuée, et nous vous proposons ce tutoriel pour comprendre l'implémentation de Spark en local et dans un cluster de machines.

Une histoire de logement

Photo de @naletu sur Unsplash

Afin de déployer un code suffisamment gourmand pour mériter d'être "sparké", nous sommes partis sur un problème ne nécessitant pas nécessairement un calcul très complexe, mais plutôt l'obligation d'effectuer ces calculs sur de nombreuses lignes avec une consolidation importante des données.

Nous constatons actuellement que l'opinion publique débat énormément, notamment sur les réseaux sociaux, à propos des logements dits vacants ou inoccupés, dans un contexte de crise du logement très prononcée. En effet, d'une part, les biens disponibles à la location et à la vente deviennent de plus en plus rares, et d'autre part, la demande pour occuper ces lieux ne cesse d'augmenter. Un article très récent de Le Monde atteste qu'un logement sur cinq à Paris serait inoccupé, alors que la demande continue de grimper.

La question est alors simple : Paris est-il un cas isolé ou un grain de semoule dans un problème plus grand qu'il n'y paraît ?

Nous avons ainsi décidé d'essayer de déterminer la part de logements dits vacants ou inoccupés dans l'ensemble des communes de France, en la rapportant à la population plutôt qu'au nombre de logements

Source et travail de la donnée

Nous avons décidé de nous baser sur les sources et données suivantes :

Ces deux fichiers comportent une colonne "numéro de la commune" dont nous pouvons nous servir afin de joindre les deux tableaux en un. Nous supprimerons aussi dans les deux fichiers les lignes dites orphelines que nous n'avons pas pu associer à une autre ligne dans l'autre fichier.

Libre accès de notre code

L'ensemble de notre dépôt Git est accessible sur le GitLab de l'université de Lille en public. Vous y trouverez notamment les éléments des sections suivantes, ainsi que notre IaaS construit avec Terraform et les scripts d'installation pour Hadoop, HDFS et YARN.

Maintenant, place à la technique pure et dure, miam ;).

Mise en place hors Spark.

Pour commencer, nous allons écrire notre programme sans Spark afin d'étudier le temps d'exécution du programme sans Spark et mieux comprendre la logique de notre code. L'ensemble du code exprimé ci-dessous se retrouve dans notre dépôt GitLab dans le dossier no_spark.

Dépendances nécessaires

Pour faire fonctionner notre programme, nous avons besoin de pandas et numpy sur Python. Nous utilisons Python en version 3.11.2.
Commençons par mettre en place notre environnement Python :

python -m venv env # Création d'un environnement pour notre projet
source env/bin/activate # Activation de notre environnement
pip install pandas numpy # Installation de pandas et numpy

Nous allons également créer deux dossiers. Le premier nous permettra de stocker notre code Python sans Spark. Dans le second, nous y mettrons nos données téléchargeables au lien communiqué précédemment.

mkdir no_spark data

Nous avons désormais tous les éléments à notre disposition pour créer notre premier programme naïf sans Spark.

Un premier code

Le pseudocode de notre solution à notre problème peut s'exprimer de la manière suivante :

population = lire_fichier_csv()
logements = lire_fichier_csv()

pop_et_log = fusionner(population, logements, par='communes')

pop_et_log[poucentage] = (nombre_logement_vacant * 100) / population_commune
pop_et_log[poucentage-2] = (nombre_logement_vacant * 100) / nombre_total_logement

sauvegarder_csv(pop_et_log)

En d'autres termes, nous devons importer les deux fichiers de données, fusionner ces deux fichiers en utilisant comme critère commun le numéro INSEE de la commune, et calculer les pourcentages du nombre de logements vacants par rapport à la population et par rapport au total de logements. Finalement, il ne nous reste plus qu'à sauvegarder ces données.

Implémentation en python

Nous allons transformer notre pseudo code précédent en code python.

# Importation des dépendances nécessaires
import pandas as pd
import numpy as np

# Importation des deux fichiers dans notre dossier data
base_pop = pd.read_csv('../data/base-pop-historiques-1876-2020.csv')
logements = pd.read_csv('../data/logements-vacants-du-parc-prive-par-commune-au-01012021-lovac.csv')

"""
Convertis un objet en int si possible
Si ele est None, 0 est retourné
"""
def convert_to_int(ele):
    nele = str(ele).replace(',', '')
    if ele is None or np.isnan(float(nele)):
        return 0
    return int(nele)

# On convertis les elements en entier
base_pop['PMUN20'] = base_pop['PMUN20'].apply(int)
logements['Nb_logvac_pp_010120'] = logements['Nb_logvac_pp_010120'].apply(convert_to_int)
logements['Nb_log_pp_2020'] = logements['Nb_log_pp_2020'].apply(convert_to_int)

# Effectuer une jointure pandas
joined_df = base_pop.merge(logements, left_on='CODGEO', right_on='INSEE_COM', how='inner')

# Calculer la colonne pourcentage
joined_df['Pourcentage vacant par population'] = (joined_df['Nb_logvac_pp_010120'] * 100) / joined_df['PMUN20']
joined_df['Pourcentage vacant par logement'] = (joined_df['Nb_logvac_pp_010120'] * 100) / joined_df['Nb_log_pp_2020']

# Sélectionner uniquement les colonnes nécessaires
result_df = joined_df[['CODGEO', 'LIBGEO', 'Nb_log_pp_2020', 'Nb_logvac_pp_010120', 'PMUN20', 'Pourcentage vacant par population', 'Pourcentage vacant par logement']]

# Trions les resultats
result_df.sort_values(by=['Pourcentage vacant par population', 'Pourcentage vacant par logement'], inplace=True, ascending=False)

# Afficher les resultats dans la sortie standart
print(result_df)

result_df.to_csv('../data/nospark-export.csv')

Et le temps d'exécution ?

Notre programme, grâce au très bon fonctionnement de Pandas, ne prend plus énormément de temps à s'exécuter (moins de 10 secondes en moyenne). Le but de ce tutoriel est essentiellement de démontrer la facilité d'utilisation et d'installation de Spark. Cependant, nous obtiendrons de meilleurs résultats si nous disposions d'un jeu de données beaucoup plus important que "seulement" des milliers de lignes de données.

Quelques notes additionnelles

Avant de passer à la mise en place de notre code avec Spark, nous souhaitions revenir sur un point durant le développement de notre premier programme. Nous avions tout d'abord privilégié une approche naïve consistant à parcourir l'ensemble de nos dataframes et les fusionner avec une simple double boucle for en Python dans notre code. Il est apparu que cette première version était impossible à réaliser malgré un ordinateur très puissant, nous sommes parvenus à être à court de RAM.

Nous avons ainsi, dans un second temps, créé le programme que nous venons de partager avec l'utilisation de jointures entre deux dataframes, transformant radicalement le temps d'exécution et la faisabilité de notre problème.

Mise en place de spark en local

Notre code sans Spark fonctionnant parfaitement, il est maintenant temps de paralléliser l'exécution de notre code sur une machine locale en utilisant les différents cœurs mis à notre disposition par la machine.

Installation de spark en local

L'installation de Spark en local consiste simplement à l'installation de la bibliothèque, ici en Python. En reprenant les prérequis précédents, nous avons juste à changer les dépendances avec pip.

pip install numpy pyspark[pandas_on_spark] pyspark # Installation de numpy pandas.pyspark et pyspark

Nous n'avons plus besoin d'installer pandas classique car nous allons utiliser pandas ré-écris par spark.

Transformation de notre code

Pour transformer notre code avec Spark, il suffit simplement de modifier les imports et de créer une session Spark. La bibliothèque PySpark Pandas ayant la même API que sa sœur Pandas, le changement de bibliothèque se fait sans encombre.

Modifications des imports et création d'une session spark :

import numpy as np
import pyspark.pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# --- Le reste du code est exactement identique

Vous pouvez retrouver notre code transformé sur le GitLab dans le fichier main.py

ET VOILÀ, nous venons de transformer notre code sans Spark avec Spark. Ce code est maintenant prêt à être exécuté avec Python en utilisant Spark sur une machine ou sur un cluster de machines, et spoiler... c'est l'étape suivante.

Le temps, toujours le temps

L'exécution de notre programme avec Spark est en réalité plus lente que sans Spark. En effet, la mise en place de la bibliothèque et son temps (environ quelques secondes) ne sont pas un investissement très rentable pour de petits programmes comme le nôtre. Pour autant, avec un jeu de données beaucoup plus important, nous pourrions observer sans problème le gain de temps effectif que peut nous apporter Spark.

Parallélisation sur un cluster de machine (Spark, Hadoop YARN et HDFS)

Spark peut être utilisé en mode cluster pour exécuter des charges de travail réparties entre plusieurs nœuds. Plusieurs gestionnaires de cluster sont compatibles avec Spark, dont Hadoop YARN. Pour stocker nos données ainsi que le résultat de nos calculs, nous utilisons HDFS, un système de fichier distribué compatible avec Hadoop YARN.

Installation de Hadoop YARN et HDFS

Nous réalisons cette installation sur des instances Ubuntu 22.04 avec l'utilisateur ubuntu⁣ :
La première étape de notre installation est d'initialiser 3 VMs : l'une sera un gestionnaire de nœuds (node-master) et les deux autres seront des nœuds de travail (node1, node2).

Sur nos 3 VMs, nous installons les dépendances nécessaires et nous téléchargeons Hadoop YARN depuis le repo officiel :

sudo  apt  update
sudo  apt  install  -y  openjdk-8-jre-headless

echo  'export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"'  >>  ~/.profile
echo  'JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"'  >>  /etc/environment

HADOOP_VERSION="3.5.0"
wget https://dlcdn.apache.org/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
tar -xzf hadoop-$HADOOP_VERSION.tar.gz
mv hadoop-$HADOOP_VERSION.tar.gz hadoop

echo  'export PATH="${PATH}:/home/ubuntu/hadoop/bin:/home/ubuntu/hadoop/sbin"
export HADOOP_CONF_DIR=/home/ubuntu/hadoop/etc/hadoop
export LD_LIBRARY_PATH="/home/ubuntu/hadoop/lib/native:$LD_LIBRARY_PATH"' >>  ~/.bashrc
source ~/.bashrc

Avant de démarrer nos nœuds Hadoop YARN, nous devons modifier certains fichiers de configuration pour indiquer l'adresse de nos nœuds de travail, paramétrer HDFS et configurer YARN.

~/hadoop/etc/hadoop/code-site.xml :

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        <property>
            <name>fs.default.name</name>
            <value>hdfs://node-master:9000</value>
        </property>
    </configuration>


~/hadoop/etc/hadoop/hdfs-site.xml :

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/home/ubuntu/hadoop/data/nameNode</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/home/ubuntu/hadoop/data/dataNode</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>


~/hadoop/etc/hadoop/hdfs-site.xml :

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>yarn.app.mapreduce.am.env</name>
        <value>HADOOP_MAPRED_HOME=/home/ubuntu/hadoop</value>
    </property>
    <property>
        <name>mapreduce.map.env</name>
        <value>HADOOP_MAPRED_HOME=/home/ubuntu/hadoop</value>
    </property>
    <property>
        <name>mapreduce.reduce.env</name>
        <value>HADOOP_MAPRED_HOME=/home/ubuntu/hadoop</value>
    </property>
    <property>
        <name>yarn.app.mapreduce.am.resource.mb</name>
        <value>512</value>
    </property>
    <property>
        <name>mapreduce.map.memory.mb</name>
        <value>256</value>
    </property>
    <property>
        <name>mapreduce.reduce.memory.mb</name>
        <value>256</value>
    </property>
</configuration>


~/hadoop/etc/hadoop/yarn-site.xml :

<configuration>
    <property>
        <name>yarn.acl.enable</name>
        <value>0</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>node-master</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>1536</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>1536</value>
    </property>
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>128</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
</configuration>


~/hadoop/etc/hadoop/workers:

node1
node2

Nous pouvons maintenant lancer nos nœuds :

# Initialisation de HDFS
hdfs  namenode  -format
start-dfs.sh

# Lancement de YARN
start-yarn.sh

Installation de Spark

Notre cluster est maintenant prêt à recevoir des charges de travail à exécuter. Nous devons donc installer Spark afin de les créer, puis de les envoyer au cluster. Nous téléchargeons Spark à partir du référentiel officiel.

SPARK="3.5.0"
wget  https://dlcdn.apache.org/spark/spark-$HADOOP_VERSION/spark-$HADOOP_VERSION-bin-hadoop3.tgz
tar  -xvf  spark-$HADOOP_VERSION-bin-hadoop3.tgz
mv  spark-$HADOOP_VERSION-bin-hadoop3  spark

echo "export PATH=/home/ubuntu/spark/bin:$PATH
export SPARK_HOME=/home/ubuntu/spark" >>  ~/.bashrc
source  .bashrc

mv  $SPARK_HOME/conf/spark-defaults.conf.template  $SPARK_HOME/conf/spark-defaults.conf

echo  "spark.master yarn
spark.driver.memory 512m
spark.yarn.am.memory 512m
spark.executor.memory 512m
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node-master:9000/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://node-master:9000/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080"  >>  $SPARK_HOME/conf/spark-defaults.conf

# Création du dossier de logs Spark sur HDFS
hdfs  dfs  -mkdir  /spark-logs

Installation de dépendances Python et des données de notre script

Notre charge de travail est un script Python nécessitant les bibliothèques numpy et pandas. Le plus simple est de procéder à leur installation sur chaque nœud. L'utilisation de sudo permet d'installer les bibliothèques pour tous.

sudo python3 -m pip install pandas numpy pyspark[pandas_on_spark] pyspark

Notre script nécessite les données de deux fichiers CSV. Pour rendre ces données accessibles lors de l'exécution sur le cluster Spark, nous devons transférer ces fichiers sur HDFS.

hdfs dfs -mkdir data
hdfs dfs -put data/logements-vacants-du-parc-prive-par-commune-au-01012021-lovac.csv data/base-pop-historiques-1876-2020.csv data

Lancement de notre charge de travail sur le cluster

Le lancement de notre charge de travail se fait grâce à la commande suivante :

spark-submit --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 main.py

Une fois de charges de travail terminée, les données résultantes du script sont disponibles sur HDFS dans le dossier data/exports :

hdfs dfs -ls data/exports
hdfs dfs -get 'data/exports/*.csv' .

Et notre problème de logement dans tout cela ?

Bien que notre problématique n'était qu'un prétexte pour tester Spark, il est tout de même important d'apporter une réponse : Paris n'est pas un cas isolé dans ce problème de logement vacant. Notamment, on y retrouve le village de Saint-Véran dans les Alpes ou la station Montclar ou Orcières. En d'autres termes, le problème de logement vacant semble encore plus accentué dans les zones touristiques et favorisé pendant les vacances, très probablement, car il doit s'agir de résidences secondaires.

Sort:  

Excellent article, très clair et instructif!

!PGM

Your post has been manually reviewed for curation.

separator2.png

Principality of Bastion's Tavern - Our Leit Motiv? Let's Grow Together.

Discord | ECU | Site | Twitch | Donations | Paypal via Streamlabs

One click delegations: 500 HP | 1500 HP | 5000 HP |25000 HP | 100000 HP
Or delegate the amount you decide to @hive-143869, using peakd's wallet, for example.

JON.gif

separator2.png

Sent 0.1 PGM - 0.1 LVL- 1 STARBITS - 0.05 DEC - 1 SBT - 0.1 THG - 0.000001 SQM - 0.1 BUDS - 0.01 WOO - 0.005 SCRAP - 0.001 INK tokens

remaining commands 4

BUY AND STAKE THE PGM TO SEND A LOT OF TOKENS!

The tokens that the command sends are: 0.1 PGM-0.1 LVL-0.1 THGAMING-0.05 DEC-15 SBT-1 STARBITS-[0.00000001 BTC (SWAP.BTC) only if you have 2500 PGM in stake or more ]

5000 PGM IN STAKE = 2x rewards!

image.png
Discord image.png

Support the curation account @ pgm-curator with a delegation 10 HP - 50 HP - 100 HP - 500 HP - 1000 HP

Get potential votes from @ pgm-curator by paying in PGM, here is a guide

I'm a bot, if you want a hand ask @ zottone444


Congratulations @boutvalentin! You have completed the following achievement on the Hive blockchain And have been rewarded with New badge(s)

You received more than 50 upvotes.
Your next target is to reach 100 upvotes.

You can view your badges on your board and compare yourself to others in the Ranking
If you no longer want to receive notifications, reply to this comment with the word STOP

Check out our last posts:

LEO Power Up Day - December 15, 2023