Aujourd'hui, nous nous attaquons à un morceau conséquent de la programmation parallèle et distribuée, et nous vous proposons ce tutoriel pour comprendre l'implémentation de Spark en local et dans un cluster de machines.
Une histoire de logement
Afin de déployer un code suffisamment gourmand pour mériter d'être "sparké", nous sommes partis sur un problème ne nécessitant pas nécessairement un calcul très complexe, mais plutôt l'obligation d'effectuer ces calculs sur de nombreuses lignes avec une consolidation importante des données.
Nous constatons actuellement que l'opinion publique débat énormément, notamment sur les réseaux sociaux, à propos des logements dits vacants ou inoccupés, dans un contexte de crise du logement très prononcée. En effet, d'une part, les biens disponibles à la location et à la vente deviennent de plus en plus rares, et d'autre part, la demande pour occuper ces lieux ne cesse d'augmenter. Un article très récent de Le Monde atteste qu'un logement sur cinq à Paris serait inoccupé, alors que la demande continue de grimper.
La question est alors simple : Paris est-il un cas isolé ou un grain de semoule dans un problème plus grand qu'il n'y paraît ?
Nous avons ainsi décidé d'essayer de déterminer la part de logements dits vacants ou inoccupés dans l'ensemble des communes de France, en la rapportant à la population plutôt qu'au nombre de logements
Source et travail de la donnée
Nous avons décidé de nous baser sur les sources et données suivantes :
- Les logements vacants par commune et par EPCI en 2020
- L'historique des populations communales (sélection des données de 2020)
Ces deux fichiers comportent une colonne "numéro de la commune" dont nous pouvons nous servir afin de joindre les deux tableaux en un. Nous supprimerons aussi dans les deux fichiers les lignes dites orphelines que nous n'avons pas pu associer à une autre ligne dans l'autre fichier.
Libre accès de notre code
L'ensemble de notre dépôt Git est accessible sur le GitLab de l'université de Lille en public. Vous y trouverez notamment les éléments des sections suivantes, ainsi que notre IaaS construit avec Terraform et les scripts d'installation pour Hadoop, HDFS et YARN.
Maintenant, place à la technique pure et dure, miam ;).
Mise en place hors Spark.
Pour commencer, nous allons écrire notre programme sans Spark afin d'étudier le temps d'exécution du programme sans Spark et mieux comprendre la logique de notre code. L'ensemble du code exprimé ci-dessous se retrouve dans notre dépôt GitLab dans le dossier no_spark.
Dépendances nécessaires
Pour faire fonctionner notre programme, nous avons besoin de pandas et numpy sur Python. Nous utilisons Python en version 3.11.2.
Commençons par mettre en place notre environnement Python :
python -m venv env # Création d'un environnement pour notre projet
source env/bin/activate # Activation de notre environnement
pip install pandas numpy # Installation de pandas et numpy
Nous allons également créer deux dossiers. Le premier nous permettra de stocker notre code Python sans Spark. Dans le second, nous y mettrons nos données téléchargeables au lien communiqué précédemment.
mkdir no_spark data
Nous avons désormais tous les éléments à notre disposition pour créer notre premier programme naïf sans Spark.
Un premier code
Le pseudocode de notre solution à notre problème peut s'exprimer de la manière suivante :
population = lire_fichier_csv()
logements = lire_fichier_csv()
pop_et_log = fusionner(population, logements, par='communes')
pop_et_log[poucentage] = (nombre_logement_vacant * 100) / population_commune
pop_et_log[poucentage-2] = (nombre_logement_vacant * 100) / nombre_total_logement
sauvegarder_csv(pop_et_log)
En d'autres termes, nous devons importer les deux fichiers de données, fusionner ces deux fichiers en utilisant comme critère commun le numéro INSEE de la commune, et calculer les pourcentages du nombre de logements vacants par rapport à la population et par rapport au total de logements. Finalement, il ne nous reste plus qu'à sauvegarder ces données.
Implémentation en python
Nous allons transformer notre pseudo code précédent en code python.
# Importation des dépendances nécessaires
import pandas as pd
import numpy as np
# Importation des deux fichiers dans notre dossier data
base_pop = pd.read_csv('../data/base-pop-historiques-1876-2020.csv')
logements = pd.read_csv('../data/logements-vacants-du-parc-prive-par-commune-au-01012021-lovac.csv')
"""
Convertis un objet en int si possible
Si ele est None, 0 est retourné
"""
def convert_to_int(ele):
nele = str(ele).replace(',', '')
if ele is None or np.isnan(float(nele)):
return 0
return int(nele)
# On convertis les elements en entier
base_pop['PMUN20'] = base_pop['PMUN20'].apply(int)
logements['Nb_logvac_pp_010120'] = logements['Nb_logvac_pp_010120'].apply(convert_to_int)
logements['Nb_log_pp_2020'] = logements['Nb_log_pp_2020'].apply(convert_to_int)
# Effectuer une jointure pandas
joined_df = base_pop.merge(logements, left_on='CODGEO', right_on='INSEE_COM', how='inner')
# Calculer la colonne pourcentage
joined_df['Pourcentage vacant par population'] = (joined_df['Nb_logvac_pp_010120'] * 100) / joined_df['PMUN20']
joined_df['Pourcentage vacant par logement'] = (joined_df['Nb_logvac_pp_010120'] * 100) / joined_df['Nb_log_pp_2020']
# Sélectionner uniquement les colonnes nécessaires
result_df = joined_df[['CODGEO', 'LIBGEO', 'Nb_log_pp_2020', 'Nb_logvac_pp_010120', 'PMUN20', 'Pourcentage vacant par population', 'Pourcentage vacant par logement']]
# Trions les resultats
result_df.sort_values(by=['Pourcentage vacant par population', 'Pourcentage vacant par logement'], inplace=True, ascending=False)
# Afficher les resultats dans la sortie standart
print(result_df)
result_df.to_csv('../data/nospark-export.csv')
Et le temps d'exécution ?
Notre programme, grâce au très bon fonctionnement de Pandas, ne prend plus énormément de temps à s'exécuter (moins de 10 secondes en moyenne). Le but de ce tutoriel est essentiellement de démontrer la facilité d'utilisation et d'installation de Spark. Cependant, nous obtiendrons de meilleurs résultats si nous disposions d'un jeu de données beaucoup plus important que "seulement" des milliers de lignes de données.
Quelques notes additionnelles
Avant de passer à la mise en place de notre code avec Spark, nous souhaitions revenir sur un point durant le développement de notre premier programme. Nous avions tout d'abord privilégié une approche naïve consistant à parcourir l'ensemble de nos dataframes et les fusionner avec une simple double boucle for en Python dans notre code. Il est apparu que cette première version était impossible à réaliser malgré un ordinateur très puissant, nous sommes parvenus à être à court de RAM.
Nous avons ainsi, dans un second temps, créé le programme que nous venons de partager avec l'utilisation de jointures entre deux dataframes, transformant radicalement le temps d'exécution et la faisabilité de notre problème.
Mise en place de spark en local
Notre code sans Spark fonctionnant parfaitement, il est maintenant temps de paralléliser l'exécution de notre code sur une machine locale en utilisant les différents cœurs mis à notre disposition par la machine.
Installation de spark en local
L'installation de Spark en local consiste simplement à l'installation de la bibliothèque, ici en Python. En reprenant les prérequis précédents, nous avons juste à changer les dépendances avec pip.
pip install numpy pyspark[pandas_on_spark] pyspark # Installation de numpy pandas.pyspark et pyspark
Nous n'avons plus besoin d'installer pandas classique car nous allons utiliser pandas ré-écris par spark.
Transformation de notre code
Pour transformer notre code avec Spark, il suffit simplement de modifier les imports et de créer une session Spark. La bibliothèque PySpark Pandas ayant la même API que sa sœur Pandas, le changement de bibliothèque se fait sans encombre.
Modifications des imports et création d'une session spark :
import numpy as np
import pyspark.pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# --- Le reste du code est exactement identique
Vous pouvez retrouver notre code transformé sur le GitLab dans le fichier main.py
ET VOILÀ, nous venons de transformer notre code sans Spark avec Spark. Ce code est maintenant prêt à être exécuté avec Python en utilisant Spark sur une machine ou sur un cluster de machines, et spoiler... c'est l'étape suivante.
Le temps, toujours le temps
L'exécution de notre programme avec Spark est en réalité plus lente que sans Spark. En effet, la mise en place de la bibliothèque et son temps (environ quelques secondes) ne sont pas un investissement très rentable pour de petits programmes comme le nôtre. Pour autant, avec un jeu de données beaucoup plus important, nous pourrions observer sans problème le gain de temps effectif que peut nous apporter Spark.
Parallélisation sur un cluster de machine (Spark, Hadoop YARN et HDFS)
Spark peut être utilisé en mode cluster pour exécuter des charges de travail réparties entre plusieurs nœuds. Plusieurs gestionnaires de cluster sont compatibles avec Spark, dont Hadoop YARN. Pour stocker nos données ainsi que le résultat de nos calculs, nous utilisons HDFS, un système de fichier distribué compatible avec Hadoop YARN.
Installation de Hadoop YARN et HDFS
Nous réalisons cette installation sur des instances Ubuntu 22.04 avec l'utilisateur ubuntu
:
La première étape de notre installation est d'initialiser 3 VMs : l'une sera un gestionnaire de nœuds (node-master) et les deux autres seront des nœuds de travail (node1, node2).
Sur nos 3 VMs, nous installons les dépendances nécessaires et nous téléchargeons Hadoop YARN depuis le repo officiel :
sudo apt update
sudo apt install -y openjdk-8-jre-headless
echo 'export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"' >> ~/.profile
echo 'JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"' >> /etc/environment
HADOOP_VERSION="3.5.0"
wget https://dlcdn.apache.org/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
tar -xzf hadoop-$HADOOP_VERSION.tar.gz
mv hadoop-$HADOOP_VERSION.tar.gz hadoop
echo 'export PATH="${PATH}:/home/ubuntu/hadoop/bin:/home/ubuntu/hadoop/sbin"
export HADOOP_CONF_DIR=/home/ubuntu/hadoop/etc/hadoop
export LD_LIBRARY_PATH="/home/ubuntu/hadoop/lib/native:$LD_LIBRARY_PATH"' >> ~/.bashrc
source ~/.bashrc
Avant de démarrer nos nœuds Hadoop YARN, nous devons modifier certains fichiers de configuration pour indiquer l'adresse de nos nœuds de travail, paramétrer HDFS et configurer YARN.
~/hadoop/etc/hadoop/code-site.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://node-master:9000</value>
</property>
</configuration>
~/hadoop/etc/hadoop/hdfs-site.xml
:
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/ubuntu/hadoop/data/nameNode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/ubuntu/hadoop/data/dataNode</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
~/hadoop/etc/hadoop/hdfs-site.xml
:
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/home/ubuntu/hadoop</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/home/ubuntu/hadoop</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/home/ubuntu/hadoop</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>256</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>256</value>
</property>
</configuration>
~/hadoop/etc/hadoop/yarn-site.xml
:
<configuration>
<property>
<name>yarn.acl.enable</name>
<value>0</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>node-master</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>1536</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>1536</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>128</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
~/hadoop/etc/hadoop/workers
:
node1
node2
Nous pouvons maintenant lancer nos nœuds :
# Initialisation de HDFS
hdfs namenode -format
start-dfs.sh
# Lancement de YARN
start-yarn.sh
Installation de Spark
Notre cluster est maintenant prêt à recevoir des charges de travail à exécuter. Nous devons donc installer Spark afin de les créer, puis de les envoyer au cluster. Nous téléchargeons Spark à partir du référentiel officiel.
SPARK="3.5.0"
wget https://dlcdn.apache.org/spark/spark-$HADOOP_VERSION/spark-$HADOOP_VERSION-bin-hadoop3.tgz
tar -xvf spark-$HADOOP_VERSION-bin-hadoop3.tgz
mv spark-$HADOOP_VERSION-bin-hadoop3 spark
echo "export PATH=/home/ubuntu/spark/bin:$PATH
export SPARK_HOME=/home/ubuntu/spark" >> ~/.bashrc
source .bashrc
mv $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
echo "spark.master yarn
spark.driver.memory 512m
spark.yarn.am.memory 512m
spark.executor.memory 512m
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node-master:9000/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://node-master:9000/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080" >> $SPARK_HOME/conf/spark-defaults.conf
# Création du dossier de logs Spark sur HDFS
hdfs dfs -mkdir /spark-logs
Installation de dépendances Python et des données de notre script
Notre charge de travail est un script Python nécessitant les bibliothèques numpy et pandas. Le plus simple est de procéder à leur installation sur chaque nœud. L'utilisation de sudo permet d'installer les bibliothèques pour tous.
sudo python3 -m pip install pandas numpy pyspark[pandas_on_spark] pyspark
Notre script nécessite les données de deux fichiers CSV. Pour rendre ces données accessibles lors de l'exécution sur le cluster Spark, nous devons transférer ces fichiers sur HDFS.
hdfs dfs -mkdir data
hdfs dfs -put data/logements-vacants-du-parc-prive-par-commune-au-01012021-lovac.csv data/base-pop-historiques-1876-2020.csv data
Lancement de notre charge de travail sur le cluster
Le lancement de notre charge de travail se fait grâce à la commande suivante :
spark-submit --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 main.py
Une fois de charges de travail terminée, les données résultantes du script sont disponibles sur HDFS dans le dossier data/exports
:
hdfs dfs -ls data/exports
hdfs dfs -get 'data/exports/*.csv' .
Et notre problème de logement dans tout cela ?
Bien que notre problématique n'était qu'un prétexte pour tester Spark, il est tout de même important d'apporter une réponse : Paris n'est pas un cas isolé dans ce problème de logement vacant. Notamment, on y retrouve le village de Saint-Véran dans les Alpes ou la station Montclar ou Orcières. En d'autres termes, le problème de logement vacant semble encore plus accentué dans les zones touristiques et favorisé pendant les vacances, très probablement, car il doit s'agir de résidences secondaires.
Excellent article, très clair et instructif!
!PGM
Your post has been manually reviewed for curation.
Principality of Bastion's Tavern - Our Leit Motiv? Let's Grow Together.
Discord | ECU | Site | Twitch | Donations | Paypal via Streamlabs
One click delegations: 500 HP | 1500 HP | 5000 HP |25000 HP | 100000 HP
Or delegate the amount you decide to @hive-143869, using peakd's wallet, for example.
BUY AND STAKE THE PGM TO SEND A LOT OF TOKENS!
The tokens that the command sends are: 0.1 PGM-0.1 LVL-0.1 THGAMING-0.05 DEC-15 SBT-1 STARBITS-[0.00000001 BTC (SWAP.BTC) only if you have 2500 PGM in stake or more ]
5000 PGM IN STAKE = 2x rewards!
Discord
Support the curation account @ pgm-curator with a delegation 10 HP - 50 HP - 100 HP - 500 HP - 1000 HP
Get potential votes from @ pgm-curator by paying in PGM, here is a guide
I'm a bot, if you want a hand ask @ zottone444
Congratulations @boutvalentin! You have completed the following achievement on the Hive blockchain And have been rewarded with New badge(s)
Your next target is to reach 100 upvotes.
You can view your badges on your board and compare yourself to others in the Ranking
If you no longer want to receive notifications, reply to this comment with the word
STOP
Check out our last posts: