Il framework di pipeline dati più piccolo del mondo

Il framework di pipeline dati più compatto del mondo

Una fondazione di una pipeline dati semplice e veloce con funzionalità sofisticate.

Foto di Ana Lucia Cottone su Unsplash

La manipolazione dei dati è forse il lavoro che richiede più tempo agli scienziati dei dati. La manipolazione dei dati include la pulizia, la trasformazione e la manipolazione generale dei dati dal loro stato grezzo in qualcosa di utile. Come molte attività, il processo di manipolazione spesso ha bisogno di essere affinato nel tempo. Pertanto, è importante tenere traccia di come un set di dati viene manipolato in modo che il tuo team possa gestire e riprodurre il processo nel tempo. La manipolazione dei dati, sebbene non sempre divertente, può essere l’attività più importante in qualsiasi azienda moderna.

Ci sono alcune aziende specializzate in pipelin e di dati, e possono essere complesse e molto sofisticate. Ma per questa esplorazione, consideriamo il compito di trasformare un file di testo in un insieme di parole o “token”, eliminando i testi che non ci sono utili. Iniziamo in modo semplice e procediamo gradualmente.

Inizialmente, definiamo una serie di passaggi per eseguire funzioni di manipolazione delle parole in un testo. Utilizzeremo la funzione Python text.translate() per fare parte del lavoro per noi. Consideriamo queste 4 funzioni:

import stringdef step1(parola):    trans = str.maketrans("", "", string.punctuation)    return parola.replace("\n", " ").translate(trans)def step2(parola):    return parola.lower()def step3(parola):    trans = str.maketrans("", "", "0123456789")    return parola.replace("\n", " ").translate(trans)def step4(parola):    return (all([carattere in string.ascii_letters for carattere in parola]) and             len(parola) > 0)

step1 è una funzione che rimuove tutta la punteggiatura da una parola e rimuove le nuove righe. step2 trasforma una parola in minuscolo. step3 utilizza nuovamente text.translate() per rimuovere i numeri. E step4 verrà utilizzato come filtro per filtrare le parole che contengono lettere non ASCII. Puoi immaginare ulteriori passaggi, come la riduzione delle parole.

Poiché si tratta di funzioni semplici, se applichiamo step1 a una parola, otterremo:

>>> step1("Testing---123;")'Testing123'

Effettivamente, ha rimosso la punteggiatura dal testo. Possiamo applicare tutte e tre le funzioni avvolgendole a matrioska intorno alla parola:

>>> step3(step2(step1("Testing---123;")))'testing'

Qui vediamo che le funzioni step1, step2 e step3 sono state applicate lasciando solo le lettere “testing”. Nota che definiremo le nostre funzioni per lavorare in un ordine specifico. Vale a dire, step1 dovrebbe essere fatto prima di step2, ecc.

Questo processo basato su funzioni è semplice da creare e semplice da usare. Certo, potremmo fare tutte le funzioni in una volta. Ma poiché la “pipeline” di funzioni diventa sempre più lunga e complessa, suddividere il processo in passaggi distinti renderà il processo più gestibile. Infatti, ogni passaggio potrebbe diventare così complesso da richiedere squadre diverse che lavorano su di essi.

Otteniamo, quindi, buoni risultati fino ad ora. Ma ovviamente, non vogliamo applicare manualmente la pipeline di funzioni su ogni parola. Invece vogliamo applicarla a ogni parola in un elenco. Per fare ciò, creiamo una funzione molto semplice applica():

def applica(passaggio, valori):    return [passaggio(valore) for valore in valori]

Ora possiamo utilizzare le stesse funzioni su interi elenchi di parole:

>>> applica(step3,           applica(step2,                 applica(step1,                       ["Testing---123;", "456---", "Ciao!"])))['testing', '', 'ciao']

Ah, sì, dobbiamo rimuovere le parole vuote. step4 è stato progettato appositamente per questo, ma è un po’ più complesso da usare. Avrebbe questo aspetto:

>>> elenco_filtrato = list(filter(step4, applica(step3, applica(step2, applica(step1, ["Testing---123;", "456---", "Ciao!"])))))
['testing', 'hello']

Questo perché step4 è una funzione filtro che restituisce True per mantenerla e False per rimuoverla, e viene applicata in questo modo: filter(step4, dati).

Ci sono alcuni problemi con questo approccio semplice:

  1. Le fasi vengono applicate dall’interno verso l’esterno. Quindi, la prima fase, step1, è la funzione più interna, mentre step3 è all’esterno. Non molto intuitivo.
  2. È molto verboso poiché dobbiamo ripetere la funzione applica() per ogni funzione di fase.
  3. I filtri (come step4) non possono essere usati come le altre funzioni.

Tenendo conto di questi problemi, possiamo astrarre la funzionalità principale in un sistema di pipeline generalizzato? Immagino un approccio a due fasi:

# Prima creiamo una funzione di pipeline:p = mia_pipeline(step1, step2, step3)# E poi la applichiamo a un set di dati:p(["Testing---123;", "456---", "Ciao!"])

Come potremmo definire mia_pipeline? Si rivela piuttosto semplice:

def mia_pipeline(*fasi):    def wrapper(input):        for fase in fasi:            input = applica(fase, input)        return input    return wrapper

Quindi, mia_pipeline è una funzione che prende una serie di funzioni di fase e restituisce una funzione che prende una lista di parole, applica ogni fase nella serie e restituisce la lista di parole processata.

Proviamolo:

>>> p = mia_pipeline(step1, step2, step3)>>> p(["Testing---123;", "456---", "Ciao!"])['testing', '', 'hello']

Funziona – abbiamo ottenuto esattamente quello che avevamo ottenuto prima! Cosa succede alla funzione filtro step4? Lasciamola per il momento e proviamo questo sistema con dati “reali”. Beh, saranno dati falsi reali. Per questi esperimenti, creeremo 10.000 documenti, ognuno composto da 10 paragrafi. Utilizzeremo il Generatore di Documenti() del pacchetto Python essential_generators.

from essential_generators import DocumentGeneratorimport osdocgen = DocumentGenerator()def genera_documenti(    count=10_000,     paragraphs=10,     output_folder="documents",     overwrite=False):    os.makedirs(output_folder, exist_ok=True)    for n in range(count):        filename = os.path.join(            output_folder,             "doc_%05d.txt" % (n + 1)        )        if overwrite or not os.path.exists(filename):            with open(filename, "w") as fp:                for p in range(paragraphs):                    fp.write(docgen.paragraph() + "\n\n")genera_documenti()

Ci vorranno circa 30 secondi per generare tutti i dati. Per continuare con il nostro codice semplice, dobbiamo introdurre un altro passaggio:

def passo0(nome_file):    return open(nome_file).read().split(" ")

Questo passo prenderà un nome di file, aprirà il file e dividerà il testo in spazi. E dobbiamo apportare una piccola modifica alla nostra funzione applica() per gestire liste di parole anziché singole parole:

def applica(passo, output):    return (passo(input) if not isinstance(input, list) else             [passo(i) for i in input] for input in output)

Ho anche apportato un’altra piccola modifica a applica: ora restituisce un’espressione di generatore anziché una comprehensions lista utilizzando le parentesi esterne invece delle parentesi quadre. Ciò ritarderà l’elaborazione fino a quando necessaria (a volte chiamata “valutazione pigra”).

Ora possiamo creare un sistema di pipeline quasi completo:

p = mia_pipeline(passo0, step1, step2, step3)list(p(["documents/doc_00001.txt"]))

Nota che richiede una lista di nomi dei file come input. Bello e semplice. Ma ci sono ancora alcune cose che mi piacerebbe vedere:

  1. la capacità di gestire i filtri in modo semplice
  2. la capacità di eseguire il pipeline in parallelo per elaborare rapidamente i dataset
  3. la capacità di visualizzare il pipeline

Per queste tre aggiunte, ti rimando al progetto picopipe che ho sviluppato basato sulle idee sopra citate. Puoi installarlo tramite pip:

pip install picopipe

e eseguirlo con le stesse funzioni di step di prima:

from picopipe import pipeline, pfilterp = pipeline(step0, step1, step2, step3, pfilter(step4))list(p(["documents/doc_00001.txt"])[0])

Qui, pfilter sta per filtro-pipeline, e lo avvolgi semplicemente intorno alla funzione step4. Sono abbastanza soddisfatto del design. Ma vediamo quanto sarà veloce.

Prima di tutto, otteniamo tutti i nomi dei documenti. Un modo facile per farlo è utilizzare glob:

import globdataset = glob.glob("documents/doc_*.txt")

E ora possiamo elaborare tutti i documenti:

results = list(p(dataset))

Questo richiede circa 21 secondi sul mio laptop per elaborare tutti i 10.000 documenti. Breve e dolce! Possiamo renderlo più veloce?

Sì! Ora c’è anche un parametro n_jobs per la pipe che indica il numero di job che puoi eseguire in parallelo. Ecco un po’ di codice che elaborerà il dataset più volte utilizzando da 0 a 9 thread. Quanto più veloce pensi che verrà eseguito utilizzando 9 thread in parallelo?

import timex = []y = []for i in range(10):    start = time.time()    results = list(p(dataset, n_jobs=i))    total_time = time.time() - start    x.append(i)    y.append(total_time)

Ci vorranno un paio di minuti. Graficando il tempo risultante rispetto ai thread:

Grafico che mostra il tempo di esecuzione della divisione del processo in un certo numero di job paralleli. Immagine dell'autore.

Interessante: il grafico si stabilizza anziché continuare a diminuire con thread aggiuntivi. Cioè, utilizzare 9 thread non è 9 volte più veloce dell’utilizzo di 1 thread. Perché no? Purtroppo, non puoi infrangere la legge. E c’è una legge: la legge di Amdahl. Sostanzialmente dice che non otterrai mai N volte più velocità perché c’è un costo di overhead che non può essere ridotto. In questo caso, posso ridurre il tempo da circa 21 secondi a 8 secondi utilizzando 4 thread. Non male!

Infine, mi piacerebbe visualizzare il pipeline. Per questa parte del progetto ho scelto di provare il formato Mermaid Diagram. Ha guadagnato molto supporto ultimamente, inclusi nei repository di GitHub. Il formato è molto semplice e facile da creare. Per il rendering di GitHub, basta dare al file l’estensione .mmd. Ecco come generare uno script Mermaid utilizzando picopipe:

from picopipe import to_mermaidwith open("pipeline.mmd", "w") as fp:    fp.write(to_mermaid(p))

Ecco come viene mostrato nel rendering di GitHub:

GitHub.com supporta direttamente i file di documento Mermaid. Immagine dell'autore.

Sfortunatamente, github non mostra la funzionalità di mouseover (definita in CSS). Tuttavia, se puoi impostare il tuo CSS, allora funziona non solo per visualizzare la pipeline, ma può mostrare il codice di un passo quando si passa il mouse sopra una casella del passo:

Un diagramma di una sirena come mostrato nei pannelli personalizzati di Comet. Immagine dell'autore.

Il grafico di Mermaid sopra con supporto al mouseover è stato creato utilizzando il sistema di pannelli personalizzati di Comet (gratuito per tutti gli utenti). È stato molto facile creare un pannello personalizzato che visualizza i file di Mermaid. Ecco una demo del grafico di Mermaid sopra, reso in tempo reale: comet.com/dsblank/picopipe/a4c044c1657b464087ec44f67ae22709

In questo modo, completiamo la nostra esplorazione dello sviluppo del Framework di Data Pipeline più piccolo del mondo e della sua parallelizzazione e visualizzazione. Puoi trovare tutto il codice qui: github.com/dsblank/picopipe. Spero che tu abbia trovato utile le idee presentate qui e il modulo finale.

Interessato all’Intelligenza Artificiale, all’Apprendimento Automatico o alla Scienza dei Dati? Considera un applauso e un follow. Doug è il Responsabile della Ricerca presso comet.com, un’azienda specializzata nel monitoraggio e tracciamento sperimentale dell’apprendimento automatico.