Parallelizzare Python su Spark Opzioni per la concorrenza con Pandas

Parallelizzare Python su Spark Opzioni per la concorrenza con Pandas

Sfrutta i benefici di Spark quando lavori con Pandas

Foto di Florian Steciuk su Unsplash

Nel mio ruolo precedente, ho passato del tempo a lavorare su un progetto interno per prevedere l’utilizzo futuro dello spazio di archiviazione su disco per i nostri clienti dei Managed Services su migliaia di dischi. Ogni disco è soggetto ai propri modelli di utilizzo e ciò significa che abbiamo bisogno di un modello di machine learning separato per ogni disco che utilizza dati storici per prevedere l’utilizzo futuro su base disco per disco. Mentre eseguire questa previsione e scegliere l’algoritmo corretto per il lavoro è una sfida di per sé, eseguire tutto ciò su larga scala presenta i suoi problemi.

Per sfruttare un’infrastruttura più sofisticata, possiamo cercare di muoverci lontano dalle previsioni sequenziali e velocizzare l’operazione della previsione parallelizzando il lavoro. Questo articolo ha lo scopo di confrontare le Pandas UDF e il modulo ‘concurrent.futures’, due approcci di elaborazione concorrente, e determinare casi d’uso per ciascuno.

La sfida

Pandas è un pacchetto gateway in Python per lavorare con dataset nell’ambito dell’analisi. Attraverso il lavoro con i DataFrames, siamo in grado di profilare i dati e valutarne la qualità, effettuare analisi esplorative dei dati, creare visualizzazioni descrittive dei dati e prevedere le tendenze future.

Anche se è certamente uno strumento eccezionale, la natura single-threaded di Python significa che può avere problemi di scalabilità quando si lavora con set di dati più grandi o quando è necessario eseguire la stessa analisi su più subset di dati.

Nel mondo dei big data, ci aspettiamo un po’ più di sofisticazione nel nostro approccio, poiché abbiamo un focus aggiuntivo sulla scalabilità per mantenere ottime performance. Spark, tra gli altri linguaggi, ci consente di sfruttare l’elaborazione distribuita per aiutarci a processare strutture dati più grandi e complesse.

Prima di analizzare questo esempio specifico, possiamo generalizzare alcuni casi d’uso che riassumono la necessità di concorrenza nell’elaborazione dei dati:

  • Applicare trasformazioni uniformi a più file di dati
  • Prevedere valori futuri per diversi subset di dati
  • Modificare iperparametri per il modello di machine learning e selezionare la configurazione più efficiente

Quando incrementiamo il requisito di eseguire carichi di lavoro come quelli suggeriti sopra e nel nostro caso, l’approccio più semplice in Python e Pandas è elaborare questi dati in modo sequenziale. Nel nostro esempio, eseguiremmo il flusso sopra per un disco alla volta.

I dati

Nel nostro esempio, abbiamo dati per migliaia di dischi che mostrano lo spazio disponibile registrato nel tempo e vogliamo prevedere i valori futuri dello spazio libero per ciascun disco.

Per rendere l’immagine più chiara, ho fornito un file csv contenente 1.000 dischi, ognuno con un mese di dati storici per lo spazio libero misurato in GB. Questo è di dimensioni sufficienti per vedere l’impatto dei diversi approcci di previsione su larga scala.

Immagine dell'autore: Esempio di DataFrame

Per un problema di serie temporali come questo, cerchiamo di utilizzare i dati storici per prevedere le tendenze future e vogliamo capire quale algoritmo di machine learning sarà più appropriato per ciascun disco. Strumenti come AutoML sono ottimi per questo quando si cerca di determinare il modello appropriato per un dataset, ma qui stiamo lavorando con 1.000 dataset, quindi è eccessivo per il nostro confronto.

In questo caso, limiteremo il numero di algoritmi che vogliamo confrontare a due e vedremo quale è il modello più appropriato da utilizzare, per ciascun disco, utilizzando l’errore quadratico medio (RMSE) come metrica di validazione. Ulteriori informazioni su RMSE possono essere trovate qui . Questi algoritmi sono:

  • Regressione lineare
  • Fbprophet (adattamento dei dati a una linea più complessa)
  • Modello di previsione delle serie storiche di Facebook.
  • Realizzato per previsioni più complesse con iperparametri per la stagionalità.

Ora abbiamo tutti i componenti pronti se volessimo prevedere lo spazio libero futuro di un singolo disco. Le azioni seguono il flusso sottostante:

Immagine dell'autore: Data Lifecycle

Ora vogliamo espandere questo, eseguendo questo flusso per più dischi, 1.000 nel nostro esempio.

Come parte della nostra revisione, confronteremo le prestazioni nel calcolo dei valori di RMSE per gli algoritmi diversi a diverse scale. A tal proposito, ho creato un sottoinsieme dei primi 100 dischi per simulare questo.

Ciò dovrebbe fornire alcuni interessanti spunti sulle prestazioni su set di dati di dimensioni diverse, eseguendo operazioni di complessità variabile.

Introduzione alla concorrenza

Python è famosamente single-threaded e quindi non utilizza tutte le risorse di calcolo disponibili in un determinato momento.

Di conseguenza, ho individuato tre opzioni:

  1. Implementare un ciclo for per calcolare le previsioni in modo sequenziale, adottando un approccio single-threaded.
  2. Utilizzare il modulo “futures” di Python per eseguire più processi contemporaneamente.
  3. Utilizzare le UDF (user-defined functions) di Pandas per sfruttare l’elaborazione distribuita in PySpark mantenendo la sintassi di Pandas e i pacchetti compatibili.

Volevo fare un confronto abbastanza approfondito in diverse condizioni ambientali, quindi ho utilizzato un cluster Databricks a singolo nodo e un altro cluster Databricks con 4 nodi worker per sfruttare Spark per il nostro approccio con le UDF di Pandas.

Seguiremo il seguente approccio per valutare l’adeguatezza dei modelli di regressione lineare e fbprophet per ogni disco:

  • Dividere i dati in set di addestramento e test
  • Utilizzare il set di addestramento come input e prevedere le date del set di test
  • Confrontare i valori previsti con i valori effettivi nel set di test per ottenere un punteggio di Root Mean Squared Error (RMSE)

Ritorneremo due cose nei nostri output: un DataFrame modificato con le previsioni, che ci darà il vantaggio aggiuntivo di tracciare e confrontare i valori previsti con quelli effettivi, e un DataFrame contenente i punteggi RMSE per ogni disco e algoritmo.

Le funzioni per farlo hanno l’aspetto seguente:

Confronteremo i tre approcci descritti sopra. Abbiamo diverse situazioni diverse, quindi possiamo compilare una tabella di quali risultati stiamo raccogliendo:

Con le seguenti combinazioni:

Metodo

  • Sequenziale
  • futures
  • Pandas UDFs

Algoritmo

  • Regressione lineare
  • Fbprophet
  • Combinato (entrambi gli algoritmi per ogni disco) – modo più efficiente per ottenere un confronto.

Modalità cluster

  • Cluster a singolo nodo
  • Cluster standard con 4 worker

Numero di dischi

  • 100
  • 1000

I risultati sono presentati in questo formato nell’appendice di questo blog, nel caso tu voglia dare un’occhiata più approfondita.

I Metodi

Metodo 1: Sequenziale

Metodo 2: concurrent.futures

Ci sono due opzioni nell’utilizzo di questo modulo: parallelizzare operazioni ad alta intensità di memoria (utilizzando ThreadPoolExecutor) o operazioni ad alta intensità di CPU (ProcessPoolExecutor). Una spiegazione descrittiva di ciò si trova nel seguente blog. Dal momento che lavoreremo su un problema ad alta intensità di CPU, ProcessPoolExecutor è adatto a ciò che stiamo cercando di ottenere.

Metodo 3: Pandas UDFs

Ora cambieremo marcia e utilizzeremo Spark e sfrutteremo l’elaborazione distribuita per migliorare la nostra efficienza. Dal momento che stiamo usando Databricks, la maggior parte della configurazione di Spark è già stata fatta per noi, ma ci sono alcune modifiche alla nostra gestione generale dei dati.

Prima di tutto, importiamo i dati in un DataFrame PySpark:

Utilizzeremo la Pandas grouped map UDF (PandasUDFType.GROUPED_MAP), poiché vogliamo passare un DataFrame e restituire un DataFrame. Dal momento che Apache Spark 3.0 non è più necessario dichiarare esplicitamente questo decorator!

Dobbiamo separare le nostre funzioni fbprophet, regression e RMSE per le Pandas UDF a causa della strutturazione del DataFrame in PySpark, ma non richiediamo un completo rifacimento del codice per raggiungerlo.

Possiamo quindi utilizzare applyInPandas per produrre i nostri risultati.

Nota: gli esempi sopra stanno solo dimostrando il processo per l’utilizzo della Regressione Lineare per una migliore leggibilità. Si prega di consultare il quaderno completo per la dimostrazione completa di questo.

Interpretazione dei risultati

Abbiamo creato grafici per i diversi metodi e le diverse configurazioni dell’ambiente, quindi abbiamo raggruppato i dati per algoritmo e numero di dischi per una facile comparazione.

Si prega di notare che i risultati tabulari si trovano nell’allegato di questo post.

Ho riassunto le principali conclusioni di questi risultati di seguito:

  • Come previsto, prevedere 1.000 dischi rispetto a 100 dischi è (in generale) un processo più lungo.
  • L’approccio sequenziale è generalmente il più lento, non riuscendo a sfruttare le risorse sottostanti in modo efficiente.
  • Le Pandas UDF sono piuttosto inefficienti per i compiti più piccoli e semplici. Il costo di trasformazione dei dati è più elevato, la parallelizzazione contribuisce a compensare questo costo.
  • Sia l’approccio sequenziale che quello con concurrent.futures non sono consapevoli del clustering disponibile in Databricks, perdendo ulteriori risorse di calcolo.

Riflessioni finali

Il contesto svolge sicuramente un ruolo importante in quale approccio risulta più efficace, ma dato che Databricks e Spark sono spesso utilizzati per problemi di Big Data, possiamo vedere i benefici dell’utilizzo delle Pandas UDF con quei dataset più grandi e complessi che abbiamo visto qui oggi.

L’utilizzo di un ambiente Spark per dataset più piccoli può essere fatto con la stessa efficienza su una configurazione di calcolo più piccola (e meno costosa!) come dimostrato dall’uso del modulo concurrent.futures, quindi tenere presente questo quando si progetta la soluzione.

Se conosci Python e Pandas, entrambi gli approcci non dovrebbero essere una grande sfida di apprendimento per allontanarsi dall’approccio sequenziale con ciclo for visto nei tutorial per principianti.

Non l’abbiamo approfondito in questo post poiché ho riscontrato discrepanze e incompatibilità con la versione attuale, ma il recente modulo pyspark.pandas sarà certamente più comune in futuro, ed è un approccio da tenere d’occhio. Questa API (insieme a Koalas, sviluppato dai ragazzi di Databricks, ma ora ritirato) sfrutta la familiarità di Pandas con i vantaggi sottostanti di Spark.

Per dimostrare l’effetto che stiamo cercando di ottenere, siamo arrivati solo a guardare i valori di RMSE prodotti per ogni disco, anziché prevedere effettivamente un set di valori di serie temporale futura. Il framework che abbiamo messo in piedi qui può essere applicato allo stesso modo per questo, con una logica per determinare se la metrica di valutazione (insieme ad altre logiche, come le limitazioni fisiche di un disco) sia appropriata in ogni caso e per prevedere i valori futuri, se possibile, utilizzando l’algoritmo determinato.

Come sempre, il quaderno può essere trovato sul mio GitHub.

Allegato

Originariamente pubblicato su https://blog.coeo.com, adattato per questa ripubblicazione.