Ottimizzazione della dimensione del file di output in Apache Spark

Ottimizzazione dimensione file output Apache Spark

Una Guida Completa sulla Gestione delle Partizioni, la Riforestazione e le Operazioni di Coalesce

Foto di zhao chen su Unsplash

Immaginati alla guida di una grande operazione di elaborazione dati Spark. Una regola spesso menzionata per ottimizzare Spark è che, per ottenere le migliori prestazioni di I/O e un’elaborazione parallela, ogni file di dati dovrebbe avere una dimensione di circa 128Mb, che è la dimensione predefinita delle partizioni quando si legge un file [1].

Immagina i tuoi file come navi che navigano nel mare dell’elaborazione dati. Se le navi sono troppo piccole, si spreca molto tempo per attraccare e salpare di nuovo, una metafora per il motore di esecuzione che impiega tempo extra ad aprire file, elencare directory, ottenere metadati degli oggetti, configurare il trasferimento dei dati e leggere i file. Al contrario, se le tue navi sono troppo grandi e non utilizzi molti moli del porto, devono aspettare un unico processo di caricamento e scaricamento lungo, una metafora per l’elaborazione delle query che attende finché un unico lettore ha finito di leggere l’intero file, riducendo la parallelizzazione [fig. 1].

Fig. 1 — Immagine dell'autore

Per illustrare vividamente l’importanza dell’ottimizzazione delle dimensioni dei file, si fa riferimento alla seguente figura. In questo esempio specifico, ogni tabella contiene 8 GB di dati.

Tuttavia, navigare in questo delicato equilibrio non è un compito facile, specialmente quando si lavora con grandi lotti di dati. Potresti sentire di aver perso il controllo sul numero di file di output. Questa guida ti aiuterà a riacquistarlo.

La Chiave per Capire: le Partizioni

Il numero di file di output salvati sul disco è uguale al numero di partizioni negli esecutori Spark quando viene eseguita l’operazione di scrittura. Tuttavia, valutare il numero di partizioni prima di eseguire l’operazione di scrittura può essere complicato.

Quando si legge una tabella, Spark legge blocchi con una dimensione massima di 128Mb (anche se è possibile cambiarlo con sql.files.maxPartitionBytes). Pertanto, il numero di partizioni dipende dalla dimensione dell’input. Tuttavia, nella realtà, il numero di partizioni sarà molto probabilmente uguale al parametro sql.shuffle.partitions. Questo numero predefinito è 200, ma per carichi di lavoro più grandi, di solito non è sufficiente. Guarda questo video per imparare come impostare il numero ideale di partizioni di shuffle.

Il numero di partizioni negli esecutori Spark è uguale a sql.shuffle.partitions se c’è almeno una trasformazione ampia nell’ETL. Se vengono applicate solo trasformazioni strette, il numero di partizioni corrisponderà al numero creato durante la lettura del file.

Impostando il numero di partizioni di shuffle otteniamo un controllo di alto livello sulle partizioni totali solo quando si lavora con tabelle non partizionate. Una volta entrati nel territorio delle tabelle partizionate, cambiare il parametro sql.shuffle.partitions non influenzerà facilmente la dimensione di ciascun file di dati.

Il Volante: Riforestazione e Coalesce

Abbiamo due modi principali per gestire il numero di partizioni durante l’esecuzione: repartition() e coalesce(). Ecco una rapida panoramica:

  • Riforestazione: repartition(partitionCols, n_partitions) è una trasformazione lazy con due parametri – il numero di partizioni e la colonna/i di partizionamento. Quando viene eseguita, Spark rimescola le partizioni nel cluster in base alla colonna di partizionamento. Tuttavia, una volta che la tabella è salvata, le informazioni sulla riforestazione vengono perse. Pertanto, queste utili informazioni non saranno utilizzate durante la lettura del file.
df = df.repartition("nome_colonna", n_partizioni)
  • Coalesce: coalesce(num_partitions) è anche una trasformazione ritardata, ma prende solo un argomento – il numero di partizioni. È importante notare che l’operazione di coalesce non redistribuisce i dati nel cluster – quindi è più veloce di repartition. Inoltre, coalesce può solo ridurre il numero di partizioni, non funzionerà se si cerca di aumentare il numero di partizioni.
df = df.coalesce(num_partitions)

Il punto principale da comprendere qui è che l’utilizzo del metodo coalesce è generalmente più vantaggioso. Ciò non significa che la ridistribuzione non sia utile; lo è sicuramente, specialmente quando abbiamo bisogno di regolare il numero di partizioni in un dataframe durante l’esecuzione.

Nella mia esperienza con i processi ETL, in cui lavoro con più tabelle di dimensioni diverse e svolgo trasformazioni e join complessi, ho scoperto che sql.shuffle.partitions non offre il controllo preciso di cui ho bisogno. Ad esempio, utilizzare lo stesso numero di partizioni per il join di due tabelle piccole e due tabelle grandi nello stesso ETL sarebbe inefficiente – porterebbe a un eccesso di piccole partizioni per le tabelle piccole o a un numero insufficiente di partizioni per le tabelle grandi. La ridistribuzione offre anche il vantaggio aggiuntivo di aiutarmi a evitare problemi con join sbilanciati e dati sbilanciati [2].

Detto ciò, la ridistribuzione è meno adatta prima di scrivere la tabella su disco e nella maggior parte dei casi può essere sostituita con coalesce. Coalesce ha il vantaggio rispetto alla ridistribuzione prima di scrivere su disco per un paio di motivi:

  1. Previeni una redistribuzione non necessaria dei dati nel cluster.
  2. Consente di ordinare i dati secondo un euristico logico. Quando si utilizza il metodo di ridistribuzione prima di scrivere, i dati vengono redistribuiti nel cluster, causando la perdita del loro ordine. D’altra parte, utilizzando coalesce si mantiene l’ordine dei dati poiché vengono raccolti insieme anziché essere redistribuiti.

Vediamo perché l’ordinamento dei dati è cruciale.

Ordine all’orizzonte: importanza dell’ordinamento dei dati

Abbiamo accennato in precedenza a come quando applichiamo il metodo repartition, Spark non salva le informazioni sulla suddivisione nelle metadati della tabella. Tuttavia, quando si lavora con big data, questa è una informazione cruciale per due motivi:

  1. Consente di scansionare la tabella molto più velocemente durante la query.
  2. Consente una migliore compressione – se si lavora con un formato compressibile (come parquet, CSV, Json, ecc). Questo è un ottimo articolo per capire perché.

Il punto chiave è ordinare i dati prima di salvarli. Le informazioni verranno conservate nei metadati e verranno utilizzate durante le query, rendendo la query molto più veloce.

Vediamo ora le differenze tra il salvataggio in una tabella non partizionata e una tabella partizionata e perché il salvataggio in una tabella partizionata richiede alcuni aggiustamenti aggiuntivi.

Gestione delle dimensioni dei file in tabelle partizionate

Per quanto riguarda le tabelle non partizionate, la gestione del numero di file durante l’operazione di salvataggio è un processo diretto. Utilizzando il metodo coalesce prima del salvataggio si otterrà il risultato desiderato, indipendentemente dal fatto che i dati siano ordinati o meno.

# Esempio di utilizzo del metodo coalesce prima del salvataggio di una tabella non partizionata
df.coalesce(10).write.format("parquet").save("/percorso/di/destinazione")

Tuttavia, questo metodo non è efficace nel caso delle tabelle partizionate, a meno che i dati non siano ordinati prima della riduzione. Per comprendere perché ciò accade, è necessario approfondire le azioni che si svolgono all’interno degli executor di Spark quando i dati sono ordinati rispetto a quando non lo sono [fig.2].

Fig. 2 — Immagine dell'autore

Pertanto, il processo standard per salvare i dati in una tabella partizionata dovrebbe essere il seguente:

# Esempio di utilizzo del metodo coalesce dopo aver ordinato i dati in una tabella partizionata
df.orderBy("nomeColonna").coalesce(10).write.format("parquet").save("/percorso/di/destinazione_partizionata")

Altri Aiuti Navigazionali

Oltre a repartition e coalesce, potresti trovare utile maxnumberofrecords. È un metodo pratico per evitare che i file diventino troppo grandi e può essere utilizzato insieme ai metodi sopra citati.

df.write.option("maxRecordsPerFile", 50000).save("file_path")

Considerazioni Finali

Dominare la dimensione dei file in un job Spark spesso comporta tentativi ed errori. È facile trascurare l’ottimizzazione in un’epoca in cui lo spazio di archiviazione è economico e la potenza di elaborazione è a portata di clic. Ma quando il processamento di terabyte e petabyte di dati diventa la norma, dimenticare queste semplici tecniche di ottimizzazione può avere costi significativi in termini monetari, temporali ed ambientali.

Spero che questo articolo ti permetta di apportare modifiche efficienti ai tuoi processi ETL. Come un capitano di mare navigato, possa tu attraversare le acque di Spark con fiducia e chiarezza.