Implementazione di ParDo e DoFn in dettaglio in Apache Beam

Implementazione di ParDo e DoFn in Apache Beam

Foto di ODISSEI su Unsplash

Spiegazione dettagliata del codice per principianti

Ho scritto un tutorial su alcune comuni funzioni di trasformazione in Apache Beam in un tutorial precedente che ha coperto map, filter e combinePerKey(). Questo tutorial sarà per la trasformazione ParDo, che non è altro che un altro modo di fare Map. Ma la differenza è che ParDo applica la trasformazione in ogni PCollection e restituisce zero o più elementi all’output PCollection. D’altra parte, la trasformazione Map restituisce esattamente un elemento per ogni elemento di input. In questo modo, ParDo ci fornisce molta flessibilità nel lavoro.

Un altro aspetto importante della trasformazione Pardo è che richiede il codice utente sotto forma di DoFn. Vediamo alcuni esempi.

Per favore, sentiti libero di scaricare questo dataset pubblico e seguire insieme:

Dati di vendita di esempio | Kaggle

Ho utilizzato un quaderno Google Colab per lavorare su questo codice, quindi è molto facile da installare. Ecco il codice per installarlo:

!pip install --quiet apache_beam

Ho creato una directory chiamata ‘data’ per mettere il file CSV che utilizzeremo e per mettere gli output del nostro esercizio di oggi.

mkdir -p data

Per iniziare, lavorerò solo sulla cosa più semplice del dataset. Leggere il dataset e creare una lista di ogni riga nel dataset e restituirle in un file di testo.

Leggere un file di testo in un pipeline beam è molto semplice e diretto. Abbiamo un file CSV. Quindi. definiremo una classe CustomCoder() per questo che codifica gli oggetti in una stringa di byte prima, quindi decodifica i byte nei relativi oggetti, e infine specifica se il coder è garantito per codificare i valori in modo deterministico. Ecco la documentazione per il coder.

from apache_beam.coders.coders import Coderclass CustomCoder(Coder):    """Un coder personalizzato usato per leggere e scrivere stringhe come UTF-8."""    def encode(self, value):        return value.encode("utf-8", "replace")    def decode(self, value):        return value.decode("utf-8", "ignore")    def is_deterministic(self):        return True

C’è anche la classe SplitRow() che utilizza semplicemente la funzione .split() di Python.

class SplitRow(beam.DoFn):  def process(self, element)…