Ottimizzazione del processo di elaborazione dei dati ETL su Talent.com con Amazon SageMaker

Ottimizzazione del processo ETL su Talent.com utilizzando Amazon SageMaker

Questo post è scritto in collaborazione da Anatoly Khomenko, Ingegnere di Machine Learning, e Abdenour Bezzouh, Chief Technology Officer presso Talent.com.

Fondata nel 2011, Talent.com aggrega annunci di lavoro a pagamento dai loro clienti e annunci di lavoro pubblici, creando così una piattaforma unificata e facilmente accessibile. Coprendo oltre 30 milioni di annunci di lavoro in oltre 75 paesi e in molteplici lingue, settori e canali di distribuzione, Talent.com soddisfa le diverse esigenze dei cercatori di lavoro, connettendo efficacemente milioni di cercatori di lavoro con opportunità di lavoro.

La missione di Talent.com è facilitare le connessioni globali del mercato del lavoro. Per raggiungere questo obiettivo, Talent.com aggrega annunci di lavoro da diverse fonti sul web, offrendo ai cercatori di lavoro l’accesso a un vasto pool di oltre 30 milioni di opportunità di lavoro adatte alle loro competenze ed esperienze. In linea con questa missione, Talent.com ha collaborato con AWS per sviluppare un avanzato motore di raccomandazione di lavoro basato sull’apprendimento profondo, mirato ad aiutare gli utenti a progredire nella propria carriera.

Per garantire il corretto funzionamento di questo motore di raccomandazione di lavoro, è fondamentale implementare una pipeline di elaborazione dati su larga scala responsabile dell’estrazione e del raffinamento delle caratteristiche degli annunci di lavoro aggregati da Talent.com. Questa pipeline è in grado di elaborare 5 milioni di record al giorno in meno di un’ora e consente di elaborare contemporaneamente record di più giorni. Inoltre, questa soluzione permette una rapida implementazione in produzione. La fonte principale di dati per questa pipeline è il formato JSON Lines, archiviato in Amazon Simple Storage Service (Amazon S3) e suddiviso per data. Questo comporta la generazione giornaliera di decine di migliaia di file JSON Lines, con aggiornamenti incrementali quotidiani.

L’obiettivo principale di questa pipeline di elaborazione dati è facilitare la creazione delle caratteristiche necessarie per l’addestramento e l’implementazione del motore di raccomandazione di lavoro su Talent.com. È importante notare che questa pipeline deve supportare gli aggiornamenti incrementali e soddisfare i requisiti complessi di estrazione delle caratteristiche necessarie per i moduli di addestramento e implementazione essenziali per il sistema di raccomandazione di lavoro. La nostra pipeline fa parte della famiglia di processi ETL (estrazione, trasformazione e caricamento) che combinano dati da diverse fonti in un repository centrale di grandi dimensioni.

Per ulteriori approfondimenti su come Talent.com e AWS hanno costruito in collaborazione tecniche all’avanguardia di elaborazione del linguaggio naturale e addestramento di modelli di apprendimento profondo, utilizzando Amazon SageMaker per creare un sistema di raccomandazione di lavoro, consulta Dal testo al lavoro dei sogni: la costruzione di un sistema di raccomandazione di lavoro basato su NLP presso Talent.com con Amazon SageMaker. Il sistema include l’ingegnerizzazione delle caratteristiche, la progettazione dell’architettura del modello di apprendimento profondo, l’ottimizzazione degli iperparametri e la valutazione del modello, dove tutti i moduli vengono eseguiti utilizzando Python.

Questo post mostra come abbiamo utilizzato SageMaker per creare una pipeline di elaborazione dati su larga scala per preparare le caratteristiche per il motore di raccomandazione di lavoro presso Talent.com. La soluzione risultante consente a un Data Scientist di ideare l’estrazione delle caratteristiche in un notebook SageMaker utilizzando librerie Python, come Scikit-Learn o PyTorch, e successivamente di implementare rapidamente lo stesso codice nella pipeline di elaborazione dati che esegue l’estrazione delle caratteristiche su larga scala. La soluzione non richiede la trasposizione del codice di estrazione delle caratteristiche per utilizzare PySpark, come richiesto quando si utilizza AWS Glue come soluzione ETL. La nostra soluzione può essere sviluppata e implementata interamente da un Data Scientist, utilizzando solo SageMaker e senza bisogno di conoscere altre soluzioni ETL, come AWS Batch. Ciò può ridurre significativamente il tempo necessario per implementare la pipeline di Machine Learning (ML) in produzione. La pipeline viene gestita attraverso Python e si integra in modo trasparente con i flussi di lavoro di estrazione delle caratteristiche, rendendola adattabile a una vasta gamma di applicazioni di analisi dei dati.

Panoramica della soluzione

Panoramica della pipeline di ETL utilizzando SageMaker Processing

Il pipeline è composto da tre fasi principali:

  1. Utilizzare un lavoro di Elaborazione SageMaker di Amazon per gestire file JSONL grezzi associati a un giorno specificato. Più giorni di dati possono essere processati da lavori di elaborazione separati contemporaneamente.
  2. Utilizzare AWS Glue per il crawling dei dati dopo l’elaborazione di più giorni di dati.
  3. Caricare le caratteristiche elaborate per un intervallo di date specificato utilizzando SQL da una tabella di Amazon Athena, quindi addestrare e distribuire il modello raccomandato per il lavoro.

Elaborazione di file JSONL grezzi

Elaboriamo file JSONL grezzi per un giorno specificato utilizzando un lavoro di Elaborazione SageMaker. Il lavoro implementa l’estrazione delle caratteristiche e la compressione dei dati e salva le caratteristiche elaborate in file Parquet con 1 milione di record per file. Approfittiamo della parallelizzazione della CPU per eseguire l’estrazione delle caratteristiche per ogni file JSONL grezzo in parallelo. I risultati dell’elaborazione di ciascun file JSONL vengono salvati in un file Parquet separato all’interno di una directory temporanea. Dopo che tutti i file JSONL sono stati elaborati, eseguiamo la compressione di migliaia di piccoli file Parquet in diversi file con 1 milione di record per file. I file Parquet compressi vengono quindi caricati in Amazon S3 come output del lavoro di elaborazione. La compressione dei dati garantisce un crawling efficiente e query SQL nelle fasi successive del pipeline.

Ecco il codice di esempio per programmare un lavoro di Elaborazione SageMaker per un giorno specificato, ad esempio il 2020-01-01, utilizzando il SDK di SageMaker. Il lavoro legge file JSONL grezzi da Amazon S3 (ad esempio da s3://bucket/raw-data/2020/01/01) e salva i file Parquet compressi in Amazon S3 (ad esempio in s3://bucket/processed/table-name/day_partition=2020-01-01/).

### installa le dipendenze %pip install sagemaker pyarrow s3fs awswranglerimport sagemakerimport boto3from sagemaker.processing import FrameworkProcessorfrom sagemaker.sklearn.estimator import SKLearnfrom sagemaker import get_execution_rolefrom sagemaker.processing import ProcessingInput, ProcessingOutputregion = boto3.session.Session().region_namerole = get_execution_role()bucket = sagemaker.Session().default_bucket()### utilizziamo un'istanza con 16 CPU e 128 GiB di memoria### nota che lo script NON caricherà l'intero dataset in memoria durante la compressione### a seconda delle dimensioni dei singoli file jsonl, potrebbe essere necessaria un'istanza più grandeinstance = "ml.r5.4xlarge"n_jobs = 8  ### utilizziamo 8 process workersdata = "2020-01-01" ### elabora i dati per un giornoest_cls = SKLearnframework_version_str = "0.20.0"### pianifica il lavoro di elaborazione script_processor = FrameworkProcessor(    role=role,    instance_count=1,    instance_type=instance,    estimator_cls=est_cls,    framework_version=framework_version_str,    volume_size_in_gb=500,)script_processor.run(    code="processing_script.py", ### nome dello script principale di elaborazione    source_dir="../src/etl/", ### localizzazione della directory del codice sorgente    ### il nostro script di elaborazione carica i file jsonl grezzi direttamente da S3    ### ciò evita lunghi tempi di avvio dei lavori di elaborazione,    ### poiché i dati grezzi non devono essere copiati nell'istanza    inputs=[], ### l'input del lavoro di elaborazione è vuoto    outputs=[        ProcessingOutput(destination="s3://bucket/processed/table-name/",                         source="/opt/ml/processing/output"),    ],    arguments=[        ### directory con l'output del lavoro        "--output", "/opt/ml/processing/output",        ### directory temporanea all'interno dell'istanza        "--tmp_output", "/opt/ml/tmp_output",        "--n_jobs", str(n_jobs), ### numero di process workers        "--date", date, ### data da elaborare        ### localizzazione con i file jsonl grezzi in S3        "--path", "s3://bucket/raw-data/",    ],    wait=False)

Il seguente è un esempio di codice per lo script principale (processing_script.py) che esegue il lavoro di Elaborazione SageMaker:

import concurrentimport pyarrow.dataset as dsimport osimport s3fsfrom pathlib import Path### funzione per elaborare il file jsonl grezzo e salvare le caratteristiche estratte in un file parquet  from process_data import process_jsonl### analizza gli argomenti da riga di comandoargs = parse_args()### utilizziamo s3fs per cercare i file jsonl grezzi nel percorso di input S3fs = s3fs.S3FileSystem()### assumiamo che i file jsonl grezzi siano archiviati in directory S3 suddivise per data### ad esempio: s3://bucket/raw-data/2020/01/01/jsons = fs.find(os.path.join(args.path, *args.date.split('-')))### localizzazione della directory temporanea all'interno del lavoro di elaborazionetmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}")### localizzazione della directory con l'output del lavorodir_out = os.path.join(args.output, f"day_partition={args.date}")### elabora i file jsonl individuali in parallelo utilizzando workers di processo n_jobsfutures=[]with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor:    for file in jsons:        inp_file = Path(file)        out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet")        ### la funzione process_jsonl legge il file jsonl grezzo dalla localizzazione di S3 (inp_file)        ### e salva il risultato in un file parquet (out_file) all'interno della directory temporanea        futures.append(executor.submit(process_jsonl, file, out_file))    ### attendi fino a quando tutti i file jsonl sono elaborati    for future in concurrent.futures.as_completed(futures):        result = future.result()### compatta i file parquetdataset = ds.dataset(tmp_out)if len(dataset.schema) > 0:    ### salva i file parquet compressi con 1 milione di record per file    ds.write_dataset(dataset, out_dir, format="parquet",                      max_rows_per_file=1024 * 1024)

La scalabilità è una caratteristica chiave della nostra pipeline. In primo luogo, più job di Elaborazione SageMaker possono essere utilizzati per elaborare dati per diversi giorni contemporaneamente. In secondo luogo, evitiamo di caricare l’intero set di dati processati o grezzi in memoria contemporaneamente, mentre elaboriamo ogni giorno specificato dei dati. Questo consente l’elaborazione dei dati utilizzando tipi di istanze che non possono contenere un’intera giornata di dati nella memoria primaria. L’unico requisito è che il tipo di istanza sia in grado di caricare N file JSONL grezzi o Parquet processati in memoria contemporaneamente, con N che rappresenta il numero di processi lavoratori utilizzati.

Esplora i dati processati utilizzando AWS Glue

Dopo che tutti i dati grezzi per più giorni sono stati processati, possiamo creare una tabella Athena dall’intero dataset utilizzando un crawler AWS Glue. Utilizziamo la libreria AWS SDK per pandas (awswrangler) per creare la tabella utilizzando il seguente codice:

import awswrangler as wr### esplora i dati processati in S3res = wr.s3.store_parquet_metadata(    path='s3://bucket/processed/table-name/',    database="database_name",    table="table_name",    dataset=True,    mode="overwrite",    sampling=1.0,    path_suffix='.parquet',)### stampa lo schema della tabellaprint(res[0])

Carica le caratteristiche processate per l’addestramento

Le caratteristiche processate per un intervallo di date specificato possono ora essere caricate dalla tabella Athena utilizzando SQL e queste caratteristiche possono quindi essere utilizzate per l’addestramento del modello di raccomandazione degli annunci di lavoro. Ad esempio, il seguente blocco di codice carica un mese di caratteristiche processate in un DataFrame utilizzando la libreria awswrangler:

import awswrangler as wrquery = """    SELECT *     FROM table_name    WHERE day_partition BETWEEN '2020-01-01' AND '2020-02-01' """### carica 1 mese di dati da database_name.table_name in un DataFrame df = wr.athena.read_sql_query(query, database='database_name')

Inoltre, l’uso di SQL per il caricamento delle caratteristiche processate per l’addestramento può essere esteso per soddisfare vari altri casi d’uso. Ad esempio, possiamo applicare una pipeline simile per mantenere due tabelle Athena separate: una per memorizzare le impressioni degli utenti e un’altra per memorizzare i clic degli utenti su queste impressioni. Utilizzando le istruzioni di join SQL, possiamo recuperare le impressioni su cui gli utenti hanno cliccato o non hanno cliccato e quindi passare queste impressioni a un job di addestramento del modello.

Vantaggi della soluzione

L’implementazione della soluzione proposta porta diversi vantaggi al nostro flusso di lavoro esistente, tra cui:

  • Implementazione semplificata – La soluzione consente l’estrazione delle caratteristiche da essere implementata in Python utilizzando librerie di ML popolari. Inoltre, non richiede che il codice venga convertito in PySpark. Ciò semplifica l’estrazione delle caratteristiche in quanto lo stesso codice sviluppato da un Data Scientist in un notebook verrà eseguito da questa pipeline.
  • Percorso rapido verso la produzione – La soluzione può essere sviluppata e implementata da un Data Scientist per eseguire l’estrazione delle caratteristiche su larga scala, permettendo loro di sviluppare un modello di raccomandazione di ML su tali dati. Allo stesso tempo, la stessa soluzione può essere implementata in produzione da un ML Engineer con poche modifiche necessarie.
  • Riusabilità – La soluzione fornisce un modello riutilizzabile per l’estrazione delle caratteristiche su larga scala e può essere facilmente adattata per altri casi d’uso oltre alla creazione dei modelli di raccomandazione.
  • Efficienza – La soluzione offre buone prestazioni: l’elaborazione di un singolo giorno dei dati di Talent.com ha richiesto meno di 1 ora.
  • Aggiornamenti incrementali – La soluzione supporta anche aggiornamenti incrementali. I nuovi dati giornalieri possono essere elaborati con un job di Elaborazione SageMaker e la posizione S3 contenente i dati processati può essere nuovamente esplorata per aggiornare la tabella Athena. Possiamo anche utilizzare un job di cron per aggiornare i dati odierni più volte al giorno (ad esempio, ogni 3 ore).

Abbiamo utilizzato questa pipeline ETL per aiutare Talent.com a elaborare 50.000 file al giorno contenenti 5 milioni di record e a creare dati di addestramento utilizzando le caratteristiche estratte da 90 giorni di dati grezzi da Talent.com, per un totale di 450 milioni di record su 900.000 file. La nostra pipeline ha aiutato Talent.com a costruire e implementare il sistema di raccomandazione in produzione in sole 2 settimane. La soluzione ha eseguito tutti i processi di ML, inclusa l’ETL, su Amazon SageMaker senza utilizzare altri servizi AWS. Il sistema di raccomandazione degli annunci di lavoro ha portato ad un aumento del 8,6% del tasso di click in test A/B online rispetto alla soluzione precedente basata su XGBoost, aiutando a connettere milioni di utenti di Talent.com a lavori migliori.

Conclusione

In questo post viene descritto il pipeline ETL che abbiamo sviluppato per il processing delle caratteristiche per l’addestramento e la distribuzione di un modello di raccomandazione di lavoro su Talent.com. La nostra pipeline utilizza i job di SageMaker Processing per il processing dei dati e l’estrazione delle caratteristiche in modo efficiente su larga scala. Il codice di estrazione delle caratteristiche è implementato in Python, consentendo l’utilizzo delle popolari librerie di ML per eseguire l’estrazione delle caratteristiche su larga scala, senza la necessità di modificare il codice per l’uso di PySpark.

Invitiamo i lettori ad esplorare la possibilità di utilizzare la pipeline presentata in questo blog come modello per i loro casi d’uso in cui è richiesta l’estrazione delle caratteristiche su larga scala. La pipeline può essere sfruttata da uno scienziato dei dati per costruire un modello di ML e la stessa pipeline può quindi essere adottata da un ingegnere di ML per l’esecuzione in produzione. Ciò può ridurre significativamente il tempo necessario per rendere il prodotto finale del sistema di ML, come è stato il caso di Talent.com. I lettori possono fare riferimento al tutorial per la configurazione e l’esecuzione dei job di SageMaker Processing. Suggeriamo inoltre ai lettori di visualizzare il post Dal testo al lavoro dei sogni: Costruire un sistema di raccomandazione di lavoro basato su NLP su Talent.com con Amazon SageMaker, in cui discutiamo le tecniche di addestramento di modelli di deep learning utilizzando Amazon SageMaker per costruire il sistema di raccomandazione di lavoro di Talent.com.