Implementazione di ParDo e DoFn in dettaglio in Apache Beam
Implementazione di ParDo e DoFn in Apache Beam
Spiegazione dettagliata del codice per principianti
Ho scritto un tutorial su alcune comuni funzioni di trasformazione in Apache Beam in un tutorial precedente che ha coperto map, filter e combinePerKey(). Questo tutorial sarà per la trasformazione ParDo, che non è altro che un altro modo di fare Map. Ma la differenza è che ParDo applica la trasformazione in ogni PCollection e restituisce zero o più elementi all’output PCollection. D’altra parte, la trasformazione Map restituisce esattamente un elemento per ogni elemento di input. In questo modo, ParDo ci fornisce molta flessibilità nel lavoro.
Un altro aspetto importante della trasformazione Pardo è che richiede il codice utente sotto forma di DoFn. Vediamo alcuni esempi.
Per favore, sentiti libero di scaricare questo dataset pubblico e seguire insieme:
- Rivoluzionare la robotica una pinza stampata in 3D che funziona senza elettronica
- Generare dati sintetici con Python
- Tutti i Grandi Modelli di Linguaggio (LLM) che dovresti conoscere nel 2023
Dati di vendita di esempio | Kaggle
Ho utilizzato un quaderno Google Colab per lavorare su questo codice, quindi è molto facile da installare. Ecco il codice per installarlo:
!pip install --quiet apache_beam
Ho creato una directory chiamata ‘data’ per mettere il file CSV che utilizzeremo e per mettere gli output del nostro esercizio di oggi.
mkdir -p data
Per iniziare, lavorerò solo sulla cosa più semplice del dataset. Leggere il dataset e creare una lista di ogni riga nel dataset e restituirle in un file di testo.
Leggere un file di testo in un pipeline beam è molto semplice e diretto. Abbiamo un file CSV. Quindi. definiremo una classe CustomCoder() per questo che codifica gli oggetti in una stringa di byte prima, quindi decodifica i byte nei relativi oggetti, e infine specifica se il coder è garantito per codificare i valori in modo deterministico. Ecco la documentazione per il coder.
from apache_beam.coders.coders import Coderclass CustomCoder(Coder): """Un coder personalizzato usato per leggere e scrivere stringhe come UTF-8.""" def encode(self, value): return value.encode("utf-8", "replace") def decode(self, value): return value.decode("utf-8", "ignore") def is_deterministic(self): return True
C’è anche la classe SplitRow() che utilizza semplicemente la funzione .split() di Python.
class SplitRow(beam.DoFn): def process(self, element)…