Dans cette première partie de ce court projet (introduit ici), je vais préparer tout le nécessaire pour faire une analyse de la défavorisation à l'aide des données publiques du gouvernement du Québec et de Statistiques Canada.

photo défavorisé

Création de la session

Je débute cette analyse en chargeant les différentes fonctionnalités dont j'aurai besoin dans cette première partie.

  • Pandas est la principale bibliothèque de fonctions pour travailler avec les structures de données
  • GeoPandas est une extension à Pandas pour manipuler les données géospatiales.
  • PySpark est une interface à Apache Spark pour Python
import pandas as pd
import geopandas as gpd
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

Je vais maintenant initialiser une session Spark locale sur mon poste de travail. L'ordinateur auquel j'ai accès a 64 Go de mémoire vive, alors, je peux dédier 32 Go pour Spark. S'il y en a une qui est déjà active, je vais la réutiliser.

spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "32g") \
.getOrCreate()

Chemins des données

Au fil des différents carnets de notes de ce projet, je vais réutiliser plusieurs fois les mêmes fichiers de données. J'ai donc défini ces chemins dans un programme Python séparé que je vais simplement charger à chaque carnet.

import paths

Données de l'indice de défavorisation par aire de diffusion (AD)

Les données fournies par l'INSPQ pour l'indice de défavorisation sont calculées au niveau de l'aire de diffusion de Statistiques Canada. Je vais charger ces données, sélectionner les colonnes qui sont pertinentes pour mon analyse et ensuite, je vais être prêt à les fusionner avec celles du recensement.

ad2016inddef_raw_df = gpd.read_file(paths.defavorisation_path)
ad2016inddef_df = spark.createDataFrame(ad2016inddef_raw_df).select(
    "ADIDU",
    "RSS",
    "ZONE",
    "NOTEMAT",
    "NOTESOC",
)

Vérifions maintenant que le type des données correspond bien au contenu des différentes variables.

ad2016inddef_df.dtypes
    [('ADIDU', 'string'),
     ('RSS', 'bigint'),
     ('ZONE', 'bigint'),
     ('NOTEMAT', 'double'),
     ('NOTESOC', 'double')]

Affichons maintenant un échantillon de données

ad2016inddef_df.limit(10).toPandas()


ADIDU RSS ZONE NOTEMAT NOTESOC
0 24790089 15 4 0.165893 -0.053274
1 24790091 15 4 0.014120 -0.028249
2 24790092 15 4 0.043328 0.022927
3 24790106 15 4 0.046330 -0.049928
4 24790107 15 4 0.084262 -0.044981
5 24790108 15 4 0.077367 -0.051872
6 24790109 15 4 0.045224 -0.024437
7 24850049 8 4 -0.007142 0.008932
8 24850050 8 4 0.007716 -0.008625
9 24850051 8 4 0.021448 0.046581


Pour Apache Spark, l'opération toPandas() est une action qui nécessite une exécution. Voici la forme que prend cette action :

001_01_topandas.png

Données du recensement

Je vais maintenant charger les données du recensement par aire de diffusion. Dans ces données, les valeurs manquantes sont identifiées par l'ellipse.... Je vais donc le spécifier comme option pour la lecture des données CSV.

recensement_raw_df = spark \
.read \
.option("nullValue",'...') \
.option("header",True) \
.csv(paths.recensement_path)

Je vais renommer quelques variables pour simplifier leur utilisation

recensement_df = recensement_raw_df.filter(
    recensement_raw_df["GEO_LEVEL"]=="4"
).withColumnRenamed(
    "Member ID: Profile of Dissemination Areas (2247)","MEMBER_ID"
).withColumnRenamed(
    "GEO_CODE (POR)","GEO_CODE"
).withColumnRenamed(
    "Dim: Sex (3): Member ID: [1]: Total - Sex","TOTAL"
).withColumnRenamed(
    "Dim: Sex (3): Member ID: [2]: Male","MALE"
).withColumnRenamed(
    "Dim: Sex (3): Member ID: [3]: Female","FEMALE"
).select(
    "MEMBER_ID",
    "GEO_CODE",
    "TOTAL",
    "MALE",
    "FEMALE"
)

Enfin, je convertis quelques valeurs sous forme de nombres entiers.

recensement_df2 = recensement_df \
.withColumn("nTOTAL",recensement_df.TOTAL.cast('integer')) \
.withColumn("nMALE",recensement_df.MALE.cast('integer')) \
.withColumn("nFEMALE",recensement_df.FEMALE.cast('integer'))

Faire pivoter les données

Les données du recensement sont structurées de façon étroite. Les variables sont donc définies dans un attribut. Pour faire de la modélisation de données, il faut rapporter ces valeurs sur un seul enregistrement en pivotant.

Pour ce faire, j'ai utilisé la technique décrite sur ce billet de blogue sur le site de Databricks : https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

Comme il y a des données combinées pour le sexe, puis séparées pour les hommes et les femmes, je vais effectuer le pivot pour chaque ensemble de valeurs. Afin de simplifier le code, je vais créer une fonction et l'appeler pour chacune des trois situations.

def pivot_recensement(df,nom_colonnes,nom_somme,nom_geocode):
    return df \
    .withColumn("pMEMBER_ID",f.format_string(nom_colonnes, "MEMBER_ID")) \
    .filter(recensement_df2["nTOTAL"]>0) \
    .groupBy("GEO_CODE") \
    .pivot("pMEMBER_ID") \
    .sum(nom_somme) \
    .fillna(0) \
    .withColumnRenamed("GEO_CODE",nom_geocode)

Chaque opération de pivot déclenche une action pour Apache Spark, voici à quoi ressemble le graphe :

001_02_pivot.png

recensement_ad_df_total = pivot_recensement(df=recensement_df2,
                                            nom_colonnes="total_%s",
                                            nom_somme="nTOTAL",
                                            nom_geocode="GEO_CODE_T")
recensement_ad_df_male = pivot_recensement(df=recensement_df2,
                                           nom_colonnes="male_%s",
                                           nom_somme="nMALE",
                                           nom_geocode="GEO_CODE_M")
recensement_ad_df_female = pivot_recensement(df=recensement_df2,
                                             nom_colonnes="female_%s",
                                             nom_somme="nFEMALE",
                                             nom_geocode="GEO_CODE_F")

Combiner les données

Je vais maintenant combiner les données pivotées ainsi que les données de l'indice de défavorisation en une seule table.

recensement_ad_defav_merged = \
recensement_ad_df_total \
.join(recensement_ad_df_male,
      recensement_ad_df_total["GEO_CODE_T"]==recensement_ad_df_male["GEO_CODE_M"],
      "full") \
.join(recensement_ad_df_female,
      recensement_ad_df_total["GEO_CODE_T"]==recensement_ad_df_female["GEO_CODE_F"],
      "full") \
.join(ad2016inddef_df,
      recensement_ad_df_total["GEO_CODE_T"]==ad2016inddef_df["ADIDU"],
      "full")

J'exporte maintenant ces données pour utilisation ultérieure. Ceci permet d'éviter de rouler le code précédent qui prend quelques minutes. J'utilise le format Parquet qui est un format binaire performant et adapté à Apache Spark.

Comme Spark fonctionne avec l'évaluation paresseuse, cette étape est la plus longue à exécuter, car tous les traitements précédents depuis la lecture des sources de données n'ont été qu'accumulés sous la forme d'un graphe acyclique orienté (Directed Acyclic Graph, ou DAG).

recensement_ad_defav_merged.write.mode("overwrite").parquet(paths.recensement_ad_defav_merged_path)

Le graphe d'exécution complet prend la forme suivante :

001_03_joins.png

Créer une matrice creuse

Une matrice creuse est une structure de données qui permet d'entreposer efficacement des données qui ont un grand nombre de valeurs manquantes ou égales à 0.

Voici quelques références utiles pour mieux comprendre comment utiliser les matrices creuses :

Je vais tout d'abord exclure les colonnes qui ne sont pas des attributs sur lesquels je souhaite modéliser.

list_nonfeature_colnames = [
    "ADIDU",
    "NOTEMAT",
    "NOTESOC",
    "GEO_CODE_T",
    "GEO_CODE_M",
    "GEO_CODE_F"
]

input_cols = [x for x in recensement_ad_defav_merged.columns if x not in list_nonfeature_colnames]
nonimput_cols = [x for x in recensement_ad_defav_merged.columns if x in list_nonfeature_colnames]

En utilisant la fonction VectorAssembler de PySpark, je vais créer un attribut nommé features qui contient des vecteurs creux.

recensement_ad_defav_sparse = (
    VectorAssembler(inputCols=input_cols, outputCol="features", handleInvalid = "keep")
    .transform(recensement_ad_defav_merged)
    .select(nonimput_cols+["features"]))

Je vais maintenant sauvegarder cette table sous forme de fichier Parquet pour utilisation dans les prochains carnets.

recensement_ad_defav_sparse.write.mode("overwrite").parquet(paths.recensement_ad_defav_sparse_path)

Liens

Retourner à la première partie

Article précédent Article suivant