Sfruttare l’apprendimento automatico su Big Data con PySpark su AWS

Utilizzare l'apprendimento automatico su Big Data con PySpark su AWS

Nota dell’editore: Suman Debnath è un relatore per ODSC APAC il 22-23 agosto. Assicurati di seguire il suo intervento, “Costruire modelli di classificazione e regressione con Spark su AWS”, lì!

Nell’arena inesorabilmente dinamica della scienza dei dati, comprendere e applicare gli strumenti giusti può plasmare significativamente i risultati delle tue iniziative di apprendimento automatico. Un caloroso saluto a tutti gli appassionati di scienza dei dati! Mi considero fortunato ad avere l’opportunità di parlare alla prossima conferenza ODSC APAC prevista per il 22 agosto 2023. La mia presentazione si concentrerà sullo sviluppo di modelli di classificazione e regressione utilizzando PySpark su AWS.

Comprensione della sessione

In questa sessione coinvolgente e interattiva, approfondiremo PySpark MLlib, una risorsa preziosa nel campo del machine learning, ed esploreremo come vari algoritmi di classificazione possono essere implementati utilizzando AWS Glue/EMR come piattaforma.

Il nostro focus sarà pratico, con un’attenzione all’applicazione pratica e alla comprensione dei concetti essenziali del machine learning. I partecipanti saranno introdotti a una varietà di algoritmi di machine learning, mettendo in evidenza la regressione logistica, una potente tecnica di apprendimento supervisionato per risolvere problemi di classificazione binaria.

Ma questa sessione va oltre i concetti e gli algoritmi. Esploreremo anche tecniche critiche di pre-elaborazione dei dati, essenziali per creare modelli di apprendimento automatico efficaci. Alla conclusione della sessione, i partecipanti acquisiranno competenze per gestire valori mancanti, modificare i tipi di dati delle colonne e dividere i loro dati in set di dati di addestramento e di test. Questa esperienza pratica avrà luogo nell’ambiente versatile di AWS Glue/EMR.

Cosa otterrai?

Questa sessione è progettata per aiutare i partecipanti a ottenere una comprensione approfondita di:

  • PySpark MLlib
  • Tecniche di apprendimento non supervisionato
  • Diversi tipi di algoritmi di classificazione
  • Implementazione di classificatori di regressione logistica
  • Pre-elaborazione dei dati utilizzando PySpark su AWS utilizzando AWS Glue e Amazon EMR
  • Creazione di modelli con PySpark su AWS

Se sei un ingegnere dei dati, uno scienziato dei dati o un appassionato di apprendimento automatico che desidera iniziare con Machine Learning con Apache Spark su AWS, questa sessione è perfetta per te.

Ora, diamo un’occhiata a ciò che ti aspetta (il repository di codice GitHub si trova qui).

Abbiamo selezionato un dataset composto da 20.057 nomi di piatti, ognuno dei quali dettagliato con 680 colonne che caratterizzano l’elenco degli ingredienti, il contenuto nutrizionale e la categoria del piatto. Il nostro obiettivo collettivo qui è prevedere se un piatto è un dessert. Si tratta di una domanda semplice e per lo più chiara: la maggior parte di noi può probabilmente classificare un piatto come dessert o meno semplicemente leggendone il nome, il che lo rende un ottimo candidato per un modello di ML semplice.

Passo 1: Importazione delle librerie necessarie

Il primo passo prevede l’importazione delle librerie necessarie, inclusi le funzioni e i tipi di dati di PySpark SQL

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Imputer, MinMaxScaler, VectorAssembler

Passo 2: Pre-elaborazione dei dati e EDA (Exploratory Data Analysis)

Carichiamo il dataset CSV delle ricette alimentari utilizzando la funzione read.csv di Spark. Il parametro inferSchema è impostato su True per inferire i tipi di dati delle colonne, e l’intestazione è impostata su True per utilizzare la prima riga come intestazioni.

# Caricamento dei dati
dataset = 's3://fcc-spark-example/dataset/2023/recipes_dataset/epi_r.csv'
food = (
          spark
              .read
              .csv(dataset, inferSchema=True, header=True)
      )
     
# Sanificazione dei nomi delle colonne
def sanitize_column_name(name):
  answer = name
  for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
      answer = answer.replace(i, j)
  return "".join(
      [
          char
          for char in answer
          if char.isalpha() or char.isdigit() or char == "_"
      ]
  )
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])

Questa parte dello script sanifica i nomi delle colonne sostituendo spazi, trattini, barre e ampersand con underscore. Rimuove anche caratteri non alfanumerici.

# Filtraggio dei dati
food = food.where(
  (
      F.col("cakeweek").isin([0.0, 1.0])
      | F.col("cakeweek").isNull()
  )
  & (
      F.col("wasteless").isin([0.0, 1.0])
      | F.col("wasteless").isNull()
  )
)

In questa sezione filtriamo i dati per mantenere solo le righe in cui le colonne cakeweek e wasteless hanno valori di 0.0 o 1.0, o sono nulli.

# Definizione delle colonne identificative, continue, target e binarie
IDENTIFICATORI = ["title"]
COLONNE_CONTINUE = [
  "rating",
  "calories",
  "proteine",
  "grassi",
  "sodio",
]
COLONNA_TARGET = ["dessert"]
COLONNE_BINARIE = [
  x
  for x in food.columns
  if x not in COLONNE_CONTINUE
  and x not in COLONNA_TARGET
  and x not in IDENTIFICATORI
]

In questa sezione definiamo quali colonne sono identificatori, variabili continue, variabili target e variabili binarie.

# Gestione dei valori mancanti
food = food.dropna(
  how="all",
  subset=[x for x in food.columns if x not in IDENTIFICATORI],
)
food = food.dropna(subset=COLONNA_TARGET)
food = food.fillna(0.0, subset=COLONNE_BINARIE)

Gestiamo i valori mancanti eliminando le righe che hanno tutti valori nulli (escludendo le colonne identificative), eliminando le righe con valori nulli nella colonna target e riempiendo i valori nulli nelle colonne binarie con 0.0.

# Conversione dei numeri in stringa in float e limitazione delle variabili continue
from typing import Optional

@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
  if not value:
      return True
  try:
      _ = float(value)
  except ValueError:
      return False
  return True
for column in ["rating", "calories"]:
  food = food.where(is_a_number(F.col(column)))
  food = food.withColumn(column, F.col(column).cast(T.DoubleType()))
massimo = {
  "calories": 3203.0,
  "proteine": 173.0,
  "grassi": 207.0,
  "sodio": 5661.0,
}
for k, v in massimo.items():
  food = food.withColumn(
      k,
      F.when(F.isnull(F.col(k)), F.col(k)).otherwise(
          F.least(F.col(k), F.lit(v))
      ),
  )

In questa parte, creiamo una funzione definita dall’utente chiamata is_a_number per verificare se una stringa può essere convertita in float. Utilizziamo questa funzione per filtrare i valori non numerici nelle colonne “rating” e “calories” e quindi convertirli in tipo double.

Successivamente, limitiamo i valori delle variabili continue “calories”, “proteine”, “grassi” e “sodio” ai massimi specificati per gestire eventuali valori anomali.

# Calcolo della somma di ogni colonna binaria
somma_colonne_binarie = [
  F.sum(F.col(x)).alias(x) for x in COLONNE_BINARIE
]
# Selezione delle somme delle colonne binarie e conversione del risultato in un dizionario
somma_colonne_binarie = (
  food.select(*somma_colonne_binarie).head().asDict()
)
# Conteggio del numero totale di righe
numero_righe = food.count()
# Identificazione delle caratteristiche rare
caratteristiche_troppo_rare = [
  k
  for k, v in somma_colonne_binarie.items()
  if v < 10 or v > (numero_righe - 10)
]
# Esclusione delle caratteristiche rare dalle colonne binarie
COLONNE_BINARIE = list(set(COLONNE_BINARIE) - set(caratteristiche_troppo_rare))

Successivamente, calcoliamo la somma di ogni colonna binaria e convertiamo il risultato in un dizionario. Successivamente, identifichiamo le caratteristiche “rare” – quelle che sono vere meno di 10 volte o vere in tutte le istanze tranne meno di 10 – e le rimuoviamo dalle nostre colonne binarie.

# Creazione di nuove caratteristiche
food = food.withColumn(
  "rapporto_proteine", F.col("proteine") * 4 / F.col("calories")
).withColumn(
  "rapporto grassi", F.col("grassi") * 9 / F.col("calories")
)
# Gestione dei valori mancanti nelle nuove caratteristiche
food = food.fillna(0.0, subset=["rapporto_proteine", "rapporto grassi"])
# Aggiunta delle nuove caratteristiche alle colonne continue
COLONNE_CONTINUE += ["rapporto_proteine", "rapporto grassi"]

Qui creiamo nuove caratteristiche “protein_ratio” e “fat_ratio” che rappresentano il rapporto tra proteine e grassi rispetto alle calorie, rispettivamente. Riempire i valori mancanti in queste nuove caratteristiche con 0.0 e aggiungerle alle nostre colonne continue.

# Riempimento dei valori mancanti nelle colonne continue
VECCHIE_COLONNE = ["calorie", "proteine", "grassi", "sodio"]
NUOVE_COLONNE = ["calorie_i", "proteine_i", "grassi_i", "sodio_i"]
imputer = Imputer(
  strategy="mean",
  inputCols=VECCHIE_COLONNE,
  outputCols=NUOVE_COLONNE,
)
imputer_model = imputer.fit(cibo)
# Aggiornamento delle colonne continue
COLONNE_CONTINUE = (
  list(set(COLONNE_CONTINUE) - set(VECCHIE_COLONNE)) + NUOVE_COLONNE
)
# Applicazione del modello di imputazione ai dati
cibo = imputer_model.transform(cibo)

In questa sezione, riempiamo i valori mancanti nelle colonne “calorie”, “proteine”, “grassi” e “sodio” con i loro valori medi utilizzando l’Imputer di Spark. Quindi aggiorniamo la nostra lista di colonne continue per includere quelle riempite.

# Assemblaggio delle caratteristiche continue in un unico vettore
CONTINUOUS_NB = [x for x in COLONNE_CONTINUE if "ratio" not in x]
continuous_assembler = VectorAssembler(
  inputCols=CONTINUOUS_NB, outputCol="continue"
)
caratteristiche_cibo = continuous_assembler.transform(cibo)

Successivamente, utilizziamo il VectorAssembler per assemblare le nostre caratteristiche continue in una singola colonna vettoriale “continue”.

# Scalatura delle caratteristiche continue
continuous_scaler = MinMaxScaler(
  inputCol="continue",
  outputCol="continue_scalate",
)
caratteristiche_cibo = continuous_scaler.fit(caratteristiche_cibo).transform(
  caratteristiche_cibo
)

Infine, scaliamo le caratteristiche continue nell’intervallo [0, 1] utilizzando il MinMaxScaler, adattandolo ai nostri dati e trasformando i nostri dati. A questo punto, il nostro dataset è pronto per le attività di machine learning!

Ora siamo pronti per eseguire il lavoro di addestramento di Machine Learning.

Passaggio 3: Addestrare, Testare ed Valutare il Modello

Una volta che i dati sono stati elaborati e trasformati, possiamo dividerli in un set di addestramento e un set di test. Dopo aver addestrato il modello, possiamo quindi valutarne le prestazioni utilizzando vari indicatori. In questa sezione, costruiamo una pipeline di ML con gli estimatori che abbiamo utilizzato per il nostro programma di preparazione delle caratteristiche di previsione del dessert e aggiungiamo il passaggio di modellazione al mix.

from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
imputer = MF.Imputer(
                            strategy="mean",
                            inputCols=["calorie", "proteine", "grassi", "sodio"],
                            outputCols=["calorie_i", "proteine_i", "grassi_i", "sodio_i"],
                          )
continuous_assembler = MF.VectorAssembler(
                                              inputCols=["valutazione", "calorie_i", "proteine_i", "grassi_i", "sodio_i"],
                                              outputCol="continue",
                                            )
continuous_scaler = MF.MinMaxScaler(
                                      inputCol="continue",
                                      outputCol="continue_scalate",
                                    )
cibo_pipeline = Pipeline(
                          stages=[imputer, continuous_assembler, continuous_scaler]
                        )

Possiamo assemblare il dataset finale con la colonna di tipo vettoriale.

preml_assembler = MF.VectorAssembler(
                                          inputCols=COLONNE_BINARIE
                                          + ["continue_scalate"]
                                          + ["protein_ratio", "fat_ratio"],
                                          outputCol="caratteristiche",
                                        )
cibo_pipeline.setStages(
                          [imputer, continuous_assembler, continuous_scaler, preml_assembler]
                        )
cibo_pipeline_model = cibo_pipeline.fit(cibo)
caratteristiche_cibo = cibo_pipeline_model.transform(cibo)

Il nostro dataframe è pronto per il machine learning! Abbiamo un certo numero di record, ognuno con

  • Una colonna target (o etichetta), dessert, contenente un input binario (1.0 se la ricetta è un dessert, 0.0 altrimenti)
  • Un vettore di caratteristiche, chiamato caratteristiche, contenente tutte le informazioni con cui vogliamo addestrare il nostro modello di machine learning

Possiamo visualizzare i risultati previsti:

caratteristiche_cibo.select("titolo", "dessert", "caratteristiche").show(30, truncate=30)

Andiamo addestrare ora un modello di apprendimento automatico utilizzando un classificatore LogisticRegression:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
                          featuresCol="features", labelCol="dessert", predictionCol="prediction"
                      )
food_pipeline.setStages(
  [
      imputer,
      continuous_assembler,
      continuous_scaler,
      preml_assembler,
      lr,
  ]
)
# Dividiamo il nostro dataframe per addestramento e testing
train, test = food.randomSplit([0.7, 0.3], 13)
train.cache()
food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)
Valutiamo ora il modello e osserviamo la matrice di confusione
results.select("prediction", "rawPrediction", "probability").show(3, False)
# Creiamo una matrice di confusione per il nostro modello utilizzando pivot()
results.groupby("dessert").pivot("prediction").count().show()
Infine, possiamo calcolare la precisione e il richiamo del nostro modello:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))
print(f"Precisione del modello: {metrics.precisionByLabel[1]}")
print(f"Richiamo del modello: {metrics.recallByLabel[1]}")

Si prega di notare che lo script completo è stato semplificato per lo scopo di questo tutorial. Per una comprensione completa delle applicazioni pratiche, compreso un approfondimento dettagliato del codice dalla preparazione dei dati alla distribuzione del modello, vi invitiamo a partecipare alla conferenza ODSC APAC 2023.

Questo breve tutorial vi ha dato un assaggio di ciò che verrà trattato nella sessione ODSC. Partecipando alla sessione, avrete l’opportunità di esplorare questi argomenti in modo più approfondito e comprendere le complessità di PySpark MLlib. L’obiettivo principale è quello di fornire agli appassionati di data science e ai professionisti gli strumenti necessari per sfruttare appieno il potenziale di Spark MLlib nei loro progetti di apprendimento automatico.

Ricordatevi che la chiave per padroneggiare qualsiasi competenza risiede nell’apprendimento costante e nell’implementazione pratica. Quindi, preparatevi e immergetevi nel fascinante mondo dell’apprendimento automatico con Spark su AWS alla conferenza ODSC. Non vediamo l’ora di vedervi lì!

Informazioni sull’autore:

Suman Debnath è un Principal Developer Advocate (Data Engineering) presso Amazon Web Services, con un focus principale su Data Engineering, Data Analysis e Machine Learning. È appassionato di sistemi distribuiti su larga scala ed è un grande fan di Python. La sua esperienza è nel campo delle prestazioni di storage e nello sviluppo di strumenti, dove ha sviluppato vari strumenti di benchmarking e monitoraggio delle prestazioni.