Utilizzando RAPIDS cuDF per sfruttare la GPU nell’ingegneria delle feature.

Using RAPIDS cuDF to leverage GPU for feature engineering.

Migliorare le prestazioni sostituendo Pandas con cuDF nella creazione di Data Frames e nell’elaborazione delle funzionalità e integrandolo con Google Colab.

Il fatto che particolari metodi abbiano avuto successo nel risolvere un problema potrebbe non portare allo stesso risultato su una scala diversa. Quando le distanze cambiano, anche le scarpe devono cambiare.

Nell’apprendimento automatico, i dati e l’elaborazione dei dati sono cruciali per garantire il successo del modello, e l’ingegneria delle caratteristiche fa parte di quel processo. Quando i dati sono pochi, la classica libreria Pandas può gestire facilmente qualsiasi attività di elaborazione sulla CPU. Tuttavia, Pandas può essere troppo lento nell’elaborazione di grandi quantità di dati. Una soluzione per migliorare la velocità e l’efficienza nell’elaborazione dei dati e nell’ingegneria delle caratteristiche è RAPIDS.

“RAPIDS è una suite di librerie software open source per l’esecuzione di pipeline di scienza dei dati e analisi end-to-end interamente su unità di elaborazione grafica (GPU). RAPIDS accelera le pipeline di scienza dei dati per creare flussi di lavoro più produttivi. [1]”

Uno strumento di RAPIDS per manipolare efficientemente i dati tabulari nell’ingegneria delle caratteristiche e nella pre-elaborazione dei dati è cuDF. RAPIDS cuDF consente la creazione di data frame GPU e la performance di diverse operazioni Pandas come l’indicizzazione, il raggruppamento, il merging e la gestione delle stringhe. Come definito sul sito di RAPIDS:

“cuDF è una libreria di data frame GPU Python (costruita sul formato di memoria colonna Apache Arrow) per il caricamento, il join, l’aggregazione, il filtraggio e la manipolazione di dati tabulari usando un’API di stile DataFrame nel formato di Pandas. [2]”

Questo articolo cerca di spiegare come creare e manipolare i data frame e applicare l’ingegneria delle caratteristiche con cuDF sulla GPU utilizzando un dataset reale.

Il nostro dataset appartiene alla Optiver Realized Volatility Prediction di Kaggle. Contiene dati di mercato azionario rilevanti per l’esecuzione pratica di negoziazioni nei mercati finanziari e include snapshot del libro degli ordini e istantanee di esecuzione degli scambi[3].

Scopriremo di più sui dati nella sezione seguente. Poi, integreremo Google Colab con Kaggle e RAPIDS. Nella terza sezione, vedremo come realizzare l’ingegneria delle caratteristiche su questo dataset utilizzando Pandas e cuDF. Ciò ci fornirà una revisione delle prestazioni comparativa delle due librerie. Nell’ultima sezione, plottiamo ed valutiamo i risultati.

Dati

I dati che utilizzeremo consistono in due serie di file[3]:

  1. book_[train/test].parquet: un file parquet, che è partizionato per stock_id, fornisce dati sul libro degli ordini sui buy e sell più competitivi inseriti nel mercato. Questo file contiene aggiornamenti passivi di acquisto / vendita.

Colonne delle caratteristiche in book_[train/test].parquet:

  • stock_id – codice ID per la stock. Parquet costringe questa colonna al tipo di dati categorico quando caricato.
  • time_id – codice ID per il bucket di tempo. Gli ID temporali non sono necessariamente sequenziali ma sono coerenti su tutte le azioni.
  • seconds_in_bucket – Numero di secondi dall’inizio del bucket, sempre a partire da 0.
  • bid_price[1/2] – Prezzi normalizzati del buy level più competitivo / secondo più competitivo.
  • ask_price[1/2] – Prezzi normalizzati del sell level più competitivo / secondo più competitivo.
  • bid_size[1/2] – Il numero di azioni sul buy level più competitivo / secondo più competitivo.
  • ask_size[1/2] – Il numero di azioni sul sell level più competitivo / secondo più competitivo.

Questo file è di 5,6 GB e contiene oltre 167 milioni di voci. Ci sono 112 azioni e 3830 finestre di tempo di 10 minuti (time_id). Ogni finestra di tempo (bucket) ha un massimo di 600 secondi. Poiché una transazione può verificarsi ogni secondo in ogni finestra di tempo per ogni azione, la moltiplicazione dei numeri menzionati può spiegare perché abbiamo milioni di voci. Un avvertimento è che non ogni secondo si verifica una transazione, il che significa che alcuni secondi in una particolare finestra di tempo sono mancanti.

  1. trade_[train/test].parquet: un file parquet, che è partizionato per stock_id, contiene dati su negoziazioni effettivamente eseguite.

Colonne delle caratteristiche in trade_[train/test].parquet:

  • stock_id – come sopra.
  • time_id – come sopra.
  • seconds_in_bucket – come sopra. Si noti che poiché i dati di trade e di libro vengono presi dalla stessa finestra di tempo e i dati di trade sono più sparsi in generale, questo campo non inizia necessariamente da 0.
  • price – Il prezzo medio delle transazioni eseguite che avvengono in un secondo. I prezzi sono stati normalizzati e la media è stata pesata dal numero di azioni negoziate in ogni transazione.
  • size – Il numero totale di azioni scambiate.
  • order_count – Il numero di ordini di negoziazione unici in corso.

Il file trade_[train/test].parquet è molto più piccolo di book_[train/test].parquet. Il primo è di 512,5 MB e ha più di 38 milioni di voci. Poiché le transazioni effettive non devono corrispondere alle intenzioni, i dati commerciali sono più sparsi e quindi ci sono meno voci.

L’obiettivo è quello di prevedere la volatilità effettiva del prezzo delle azioni calcolata nella finestra di 10 minuti successiva dai dati di riferimento relativi allo stesso stock_id/time_id. Questo progetto prevede una grande quantità di ingegneria delle caratteristiche che dovrebbe essere eseguita su un grande dataset. Lo sviluppo di nuove caratteristiche aumenterà anche la dimensione dei dati e la complessità computazionale. Un rimedio consiste nell’utilizzare cuDF invece della libreria Pandas.

In questo blog, vedremo alcune attività di ingegneria delle caratteristiche e manipolazioni di frame di dati che proveranno sia Pandas che cuDF per confrontare le loro prestazioni. Tuttavia, non utilizzeremo tutti i dati, ma solo i record di una singola azione per vedere un’implementazione esemplare. Si può controllare il notebook per vedere tutto il lavoro di ingegneria delle caratteristiche svolto sull’intero dataset.

Dato che eseguiamo il codice su Google Colab, dovremmo prima configurare il nostro notebook per integrare Kaggle e RAPIDS.

Configurazione del Notebook Google Colab

Ci sono alcuni passaggi per configurare il notebook Colab:

  1. Crea un token API sull’account Kaggle per autenticare il notebook con i servizi Kaggle.

Vai su Impostazioni e clicca su “Crea nuovo token”. Verrà scaricato un file chiamato “kaggle.json” che contiene il nome utente e la chiave API.

  1. Avvia un nuovo notebook su Google Colab e carica il file kaggle.json.

Carica il file kaggle.json in Google Colab cliccando sull’icona “Carica nella memoria della sessione”.

  1. Fai clic sul menu “Runtime” in alto sulla pagina, quindi su “Cambia tipo di runtime” e conferma che il tipo di istanza è GPU .
  2. Esegui il comando seguente e controlla l’output per assicurarti di aver ottenuto un Tesla T4, P4 o P100.
!nvidia-smi
  1. Scarica i file di installazione di RAPIDS-Colab e controlla la tua GPU:
!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!python rapidsai-csp-utils/colab/pip-install.py

Assicurati che la tua istanza di Colab sia compatibile con RAPIDS nell’output di questa cella.

  1. Verifica se le librerie RAPIDS sono state installate correttamente:
import cudf, cuml
cudf.__version__

Siamo pronti con la configurazione di Google Colab se l’installazione non presenta errori. Ora possiamo caricare il dataset Kaggle.

Importazione e Caricamento del Dataset Kaggle

Dobbiamo fare alcune disposizioni nella nostra istanza Colab per importare il dataset da Kaggle.

  1. Installa la libreria Kaggle:
!pip install -q kaggle
  1. Crea una directory chiamata “.kaggle”:
!mkdir ~/.kaggle
  1. Copia il file “kaggle.json” in questa nuova directory:
!cp kaggle.json ~/.kaggle/
  1. Assegna le autorizzazioni richieste per questo file:
!chmod 600 ~/.kaggle/kaggle.json
  1. Scarica il dataset da Kaggle:
!kaggle competitions download optiver-realized-volatility-prediction
  1. Crea una directory per i dati non compressi:
!mkdir train
  1. Decomprimi i dati nella nuova directory:
!unzip optiver-realized-volatility-prediction.zip -d train
  1. Importa tutte le altre librerie che ci servono:
import glob
import numpy as np
import pandas as pd
from cudf import DataFrame
import matplotlib.pyplot as plt
from matplotlib import style
from collections import defaultdict
from IPython.display import display
import gc
import time
import warnings
%matplotlib inline
  1. Imposta le opzioni di Pandas:
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_columns", None)
warnings.filterwarnings("ignore")

print("Soglia:", gc.get_threshold())
print("Conteggio:", gc.get_count())
  1. Definisci i parametri:
# Directory dei dati che contiene i file
DIR = "/content/train/"

# Numero di cicli di esecuzione
ROUNDS = 30
  1. Ottieni i file:
# Ottieni i book di ordini e di scambi
order_files = glob.glob(DIR + "book_train.parquet" + "/*")
trade_files = glob.glob(DIR + "trade_train.parquet" + "/*")
print(order_files[:5])
print("\n")
print(trade_files[:5])
print("\n")

# Ottieni gli stock_ids come lista
stock_ids = sorted([int(file.split('=')[1]) for file in order_files])
print(f"{len(stock_ids)} azioni: \n {stock_ids} \n")

Ora il nostro notebook è pronto per eseguire tutte le operazioni di dataframe sui dati e per eseguire l’ingegneria delle caratteristiche.

Ingegneria delle Caratteristiche

In questa sezione verranno discusse 13 tipiche operazioni di ingegneria su Pandas dataframe e cuDF. Vedremo quanto tempo impiegano queste operazioni e quanta memoria utilizzano. Iniziamo caricando i dati.

1. Caricamento dei dati

def load_dataframe(files, dframe=0):

   print("CARICAMENTO DEI DATA FRAMES", "\n")
  
   # Carica il dataframe di Pandas
   if dframe == 0:
     print("Caricamento del dataframe di Pandas..", "\n")
     start = time.time()
     df_pandas = pd.read_parquet(files[0])
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"Per il dataframe di Pandas: \n tempo di inizio: {start} \n tempo di fine: {end} \n tempo trascorso: {elapsed_time} \n")
     return df_pandas, elapsed_time

   # Carica il dataframe di cuDF
   else:
     print("Caricamento del dataframe di cuDF..", "\n")
     start = time.time()
     df_cudf = cudf.read_parquet(files[0])
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"Per il dataframe di cuDF: \n tempo di inizio: {start} \n tempo di fine: {end} \n tempo trascorso: {elapsed_time} \n ")

     return df_cudf, elapsed_time

Quando dframe=0, i dati verranno caricati come un dataframe di Pandas, altrimenti di cuDF. Ad esempio,

Pandas:

# Carica il dataframe degli ordini di Pandas e calcola il tempo
df_pd_order, _ = load_dataframe(order_files, dframe=0)
display(df_pd_order.head())

Questo restituirà i primi cinque record del Book degli Ordini (book_[train/test].parquet):

cuDF:

# Carica il dataframe dei book di cuDF e calcola il tempo
df_cudf_order, _ = load_dataframe(order_files, dframe=1)
display(df_cudf_order.head())

Output:

Exhibit-7: Caricamento dei dati come cuDF (Immagine dell’autore)

Otteniamo informazioni sui dati del libro degli ordini dalla versione di Pandas:

# Informazioni sul dataframe degli ordini
display(df_pd_order.info())

Output:

Exhibit-8: Informazioni sui dati del libro degli ordini del primo stock (immagine dell’autore)

L’immagine sopra ci dice che il primo stock ha circa 1,4 milioni di voci e occupa 47,8 MB di spazio nella memoria. Per ridurre lo spazio e aumentare la velocità, dovremmo convertire i tipi di dati in formati minori, cosa che faremo in seguito.

In modo simile, carichiamo i dati del Trade Book (trade_[train/test].parquet) in entrambe le librerie di dataframe come abbiamo fatto per i dati del libro degli ordini. I dati e le relative informazioni saranno così:

Exhibit-9: Dati del Trade Book del primo stock e relative informazioni (immagine dell’autore)

I dati di trading per il primo stock sono di 3,7 MB e hanno oltre 276 mila record.

In entrambi i file (Order Book e Trade Book), non ogni finestra di tempo ha 600 punti di secondi. In altre parole, un particolare intervallo di tempo potrebbe avere transazioni o offerte solo in alcuni secondi nell’intervallo di 10 minuti. Questo ci fa affrontare dati sparsi in entrambi i file in cui mancano alcuni secondi. Dovremmo correggerlo riempiendo in avanti tutte le colonne per i secondi mancanti. Mentre Pandas ci consente di fare il forward filling, cuDF non ha questa funzionalità. Quindi, faremo il forward filling in Pandas e ricreeremo il cuDF dal dataframe Pandas forward-filled. Ci rammarichiamo di questo, poiché l’obiettivo centrale di questo blog è mostrare come cuDF superi Pandas. Ho controllato la questione più volte in passato, ma con la mia migliore conoscenza non sono riuscito a trovare il metodo in cuDF come implementato in Pandas. Quindi, possiamo fare il forward-filling come segue [4]:

# Forward fill data
def ffill(df, df_name="order"):
  
   # Forward fill
   df_pandas = df.set_index(['time_id', 'seconds_in_bucket'])

   if df_name == "order":
     df_pandas = df_pandas.reindex(pd.MultiIndex.from_product([df_pandas.index.levels[0], np.arange(0,600)], names = ['time_id', 'seconds_in_bucket']), method='ffill')
     df_pandas = df_pandas.reset_index()
    
   else:
     df_pandas = df_pandas.reindex(pd.MultiIndex.from_product([df_pandas.index.levels[0], np.arange(0,600)], names = ['time_id', 'seconds_in_bucket']))
     # Fill nan values with 0
     df_pandas = df_pandas.fillna(0)
     df_pandas = df_pandas.reset_index()   

   # Convert to a cudf dataframe
   df_cudf = cudf.DataFrame.from_pandas(df_pandas)

   return df_pandas, df_cudf

Prendiamo i dati dell’ordine come esempio e vediamo come vengono elaborati:

# Forward fill order dataframes
expanded_df_pd_order, expanded_df_cudf_order = ffill(df_pd_order, df_name="order")
display(expanded_df_cudf_order.head())

Exhibit-10: Forward Filling Dei Dati Dell’Ordine (Immagine Dell’autore)

A differenza dei dati in Exhibit 7, i dati forward-filled in Exhibit 10 hanno tutti i 600 secondi nel bucket di tempo “5” da 0 a 599, inclusi. Facciamo la stessa operazione anche sui dati del trade.

2. Unione dei Data Frames

Abbiamo due set di dati, ordine e trade, entrambi forward-filled. Entrambi i set di dati sono rappresentati nei framework Pandas e cuDF. Successivamente, uniremo i set di dati dell’ordine e del trade su time_id e seconds_in_buckets.

def merge_dataframes(df1, df2, dframe=0):

   print("UNIONE DEI DATA FRAMES", "\n")
  
   if dframe == 0:
     df_type = "Pandas"
   else:
     df_type = "cuDF"

   # Merge dataframes
   print(f"Unione dei {df_type} dataframes..", "\n")
   start = time.time()
   df = df1.merge(df2, how="left", on=["time_id", "seconds_in_bucket"], sort=True)
   end = time.time()
   elapsed_time = round(end-start, 3)
   print(f"Per i {df_type} dataframes: \n tempo di inizio: {start} \n tempo di fine: {end} \n tempo trascorso: {elapsed_time} \n")

   return df, elapsed_time

cuDF eseguirà il seguente comando:

# Unisci i dataframe cuDF degli ordini e delle negoziazioni
df_cudf, cudf_merge_time = merge_dataframes(expanded_df_cudf_order, expanded_df_cudf_trade, dframe=1)
display(df_cudf.head())

expanded_df_cudf_trade è il dato forward-filled delle negoziazioni e viene ottenuto allo stesso modo di expanded_df_pd_order o expanded_df_cudf_order. L’operazione di unione creerà un dataframe combinato come mostrato di seguito:

Exhibit-11: Unione di Data Frame (Immagine dell’autore)

Tutte le colonne dei due dataset vengono combinate in uno. L’operazione di unione viene ripetuta anche per i data frame di Pandas.

3. Modifica del Dtype

Vogliamo modificare il tipo di dati di alcune colonne per ridurre lo spazio di memoria e aumentare la velocità di calcolo.

# Apporta modifiche al dtype
def change_dtype(df, dframe=0):

   print("MODIFICA DTYPES", "\n")

   convert_dict = {"time_id": "int16",
                   "seconds_in_bucket": "int16",
                   "bid_size1": "int16",
                   "ask_size1": "int16",
                   "bid_size2": "int16",
                   "ask_size2": "int16",
                   "size": "int16",
                   "order_count": "int16"
                   } 

   df = df.astype(convert_dict)

   return df, dframe

Quando eseguiamo il comando seguente:

# Apporta modifiche al dtype per il dataframe cuDF
df_cudf, _ = change_dtype(df_cudf)
display(df_cudf.info())

otteniamo il seguente output:

Exhibit-12: Modifica del Dtype (Immagine dell’autore)

I dati dell’Exhibit 12 utilizzerebbero più spazio di memoria se non venisse effettuata la conversione del tipo di dati. Ha ancora 78,9 MB ma, questo è stato dopo le operazioni di forward-fill e merge, che hanno prodotto 13 colonne e 2,3 milioni di voci.

Eseguiamo ogni attività di feature engineering sia per Pandas DF che per cuDF. Qui mostriamo solo quella per cuDF come esempio.

4. Ottenere Time Ids unici

Utilizzeremo il metodo unico per estrarre i time_ids in questa sezione.

# Ottieni i valori unici nella colonna time_id e mettili in una lista
def get_unique_timeids(df, dframe=0):

   global time_ids

   print("OTTENIMENTO VALORI UNICI", "\n")

   # Ottieni i time_ids unici
   if dframe == 0:
     print(f"Ottenere i time_ids unici ordinati dal dataframe di Pandas..", "\n")
     start = time.time()
     time_ids = sorted(df['time_id'].unique().tolist())
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"Time_ids unici dal dataframe di Pandas: \n inizio: {start} \n fine: {end} \n tempo trascorso: {elapsed_time} \n")

   else:
     print(f"Ottenere i time_ids unici ordinati dal dataframe cuDF..", "\n")
     start = time.time()
     time_ids = sorted(df['time_id'].unique().to_arrow().to_pylist())
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"Time_ids unici dal dataframe cuDF: \n inizio: {start} \n fine: {end} \n tempo trascorso: {elapsed_time} \n")

   print(f"{len(time_ids)} time buckets: \n {time_ids[:10]}...")
   print("\n")

   return df, time_ids

Il codice sopra otterrà i time_ids unici da Pandas DF e cuDF.

# Ottieni i time_ids dal dataframe cuDF
time_ids = get_unique_timeids(df_cudf_order, dframe=1)

L’output di cuDF appare come segue:

Exhibit-13: Ottenere Time Ids unici (Immagine dell’autore)

5. Verifica dei valori Null

In seguito, verificheremo i valori Null nei data frame.

# Verifica dei valori null nel df
def check_null_values(df, dframe=0):

   print("VERIFICA DEI VALORI NULL", "\n")

   print("Verifica dei valori null del data frame..", "\n")
   display(df.isna().values.any())
   display(df.isnull().sum())

   return df, dframe

Esempio di verifica dei valori null in cuDF:

# Verifica dei valori null per il data frame cuDF
df_cudf, _ = check_null_values(df_cudf, dframe=0)

E l’output è:

Mostra-14: Verifica dei Valori Null (Immagine dell’autore)

6. Aggiunta di una Colonna

Vogliamo creare più funzionalità, quindi aggiungiamo alcune colonne.

# Aggiungi colonne
def add_column(df, dframe=0):

   print("AGGIUNTA DI COLONNE", "\n")

   # Calcolare i WAP
   df['wap1'] = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
   df['wap2'] = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])

   # Calcolare i volumi dell'ordine
   df['bid1_volume'] = df['bid_price1'] * df['bid_size1']
   df['bid2_volume'] = df['bid_price2'] * df['bid_size2']
   df['ask1_volume'] = df['ask_price1'] * df['ask_size1']
   df['ask2_volume'] = df['ask_price2'] * df['ask_size2']

   # Calcolare lo sbilanciamento del volume
   df['imbalance'] = np.absolute((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

   # Calcolo dello sbilanciamento del volume delle negoziazioni
   df['volume_imbalance'] = np.absolute((df['bid_price1'] * df['bid_size1']) - (df['ask_price1'] * df['ask_size1']))

   return df, dframe

Ciò creerà nuove funzionalità come il prezzo medio ponderato (wap1 e wap2), il volume dell’ordine e lo sbilanciamento del volume. In totale, otto colonne verranno aggiunte ai data frame eseguendo quanto segue:

# Aggiungi una colonna nel data frame cuDF
df_cudf, _ = add_column(df_cudf)
display(df_cudf.head())

Ciò darà come risultato:

Mostra-15: Aggiunta di Colonne e Funzionalità (Immagine dell’autore)

7. Eliminazione di una Colonna

Decidiamo di eliminare due funzionalità, wap1 e wap2, eliminando le loro colonne:

# Elimina colonne
def drop_column(df, dframe=0):

   print("ELIMINAZIONE DELLE COLONNE", "\n")

   df.drop(columns=['wap1', 'wap2'], inplace=True)

   return df, dframe

L’implementazione dell’eliminazione delle colonne è:

# Aggiungi una colonna nel data frame cuDF
df_cudf, _ = drop_column(df_cudf)
display(df_cudf.head())

Ciò ci lascia con i data frame in cui le colonne wap1 e wap2 sono scomparse!

8. Calcolo delle Statistiche per Gruppo

Successivamente, calcoliamo la media, la mediana, il massimo, il minimo, la deviazione standard e la somma di alcune funzionalità per time_id. Per questo, utilizzeremo i metodi groupby e agg.

# Calcola le statistiche per le funzionalità selezionate
def calc_agg_stats(df, dframe=0):

   print("CALCOLO DELLE STATISTICHE", "\n")

   # Calcoli statistici da effettuare
   operations = ["mean", "median", "max", "min", "std", "sum"]

   # Funzionalità per le quali verranno effettuati i calcoli statistici
   features_list = ["bid1_volume", "bid2_volume", "ask1_volume", "ask2_volume"]

   # Creazione di un dizionario per memorizzare le coppie funzionalità-calcolo
   stats_dict = defaultdict(list)
   for feature in features_list:
       stats_dict[feature].extend(operations)

   # Calcola le statistiche aggregate
   df_stats = df.groupby('time_id', as_index=False, sort=True).agg(stats_dict)

   return df, df_stats

Creiamo una lista chiamata features_list per specificare le caratteristiche su cui verranno eseguiti i calcoli matematici.

# Calcola le statistiche dalle caratteristiche selezionate in un dataframe cuDF
_, df_cudf_stats = calc_agg_stats(df_cudf)
display(df_cudf_stats.head())

In cambio, otteniamo l’output seguente: Exhibit-16: Calcolo delle statistiche (Immagine dell’autore)

La tabella restituita è un nuovo data frame. Dovremmo unirlo con quello originale (df_cudf). Lo faremo attraverso Pandas:

# Unisci il dataframe con le statistiche
def merge_dataframes_2(df, dframe=0):

   if dframe == 0:
     df = df.merge(df_pd_stats, how="left", on="time_id", sort=True)
  
   else:
     df = df.to_pandas()
     df = df.merge(df_pd_stats, how="left", on="time_id", sort=True)
     df = cudf.DataFrame.from_pandas(df)

   return df, dframe


# Unisci i data frame cuDF
df_cudf, _ = merge_dataframes_2(df_cudf, dframe=1)
display(df_cudf.head())

Il frammento sopra riporterà df_pd_stats e df_pd in un data frame e lo salverà come df_cudf.

Come al solito, ripetiamo lo stesso compito per Pandas.

Il passo successivo è calcolare la correlazione tra due colonne:

# Calcola la correlazione tra due caratteristiche selezionate
def calc_corr(df, dframe=0):

 correlation = df[["bid1_volume", "ask1_volume"]].corr()
 print(f"La correlazione tra 'bid1_volume' e 'ask1_volume' è {correlation} \n")

 return df, correlation

Questo codice

# Calcola la correlazione nel dataframe cuDF
_ = calc_corr(df_cudf)

restituirà l’output seguente:

Exhibit-17: Calcolo della correlazione tra due caratteristiche (Immagine dell’autore)

9. Rinominare le colonne

Per eliminare ogni confusione, dovremmo rinominare due delle nostre colonne.

# Rinomina le colonne
def rename_cols(df, dframe=0):

   print("RINOMINA LE COLONNE", "\n")

   df = df.rename(columns={"imbalance": "volume_imbalance", "volume_imbalance": "trade_volume_imbalance"})

   return df, dframe

Le colonne imbalance e volume_imbalance saranno rinominate rispettivamente come volume_imbalance e trade_volume_imbalance.

10. Dividere una colonna in gruppi

Un’altra manipolazione dei dati che vogliamo fare è di dividere bid1_volume in gruppi e salvare i gruppi in una nuova colonna.

# Dividi una colonna selezionata in gruppi
def bin_col(df, dframe=0):

   print("DIVISIONE DI UNA COLONNA IN GRUPPI", "\n")

   if dframe == 0:
     df['bid1_volume_cut'] = pd.cut(df["bid1_volume"], bins=5, labels=["molto alta", "alta", "media", "bassa", "molto bassa"], ordered=True)

   else:
     df['bid1_volume_cut'] = cudf.cut(df["bid1_volume"], bins=5, labels=["molto alta", "alta", "media", "bassa", "molto bassa"], ordered=True)

   return df, dframe

Eseguendo le righe

# Dividi una colonna selezionata nel dataframe cuDF
df_cudf, _ = bin_col(df_cudf, dframe=1)
display(df_cudf.head())

otterremo un data frame come output, di cui possiamo vedere una parte come mostrato di seguito:

Exhibit-18: Divisione di una colonna in gruppi (Immagine dell’autore)

11. Visualizzazione dei data frame

Dopo che le operazioni di feature engineering sono state completate, possiamo presentare i data frame. Questa sezione contiene tre operazioni: visualizzazione del data frame, ottenere informazioni su di esso e descriverlo.

# Mostra il data frame
def display_df(df, dframe=0):

   print("MOSTRA I DATA FRAME", "\n")

   display(df.head())
   print("\n")

   return df, dframe


# Mostra le informazioni del data frame
def display_info(df, dframe=0):

   print("MOSTRA LE INFORMAZIONI DEL DATA FRAME", "\n")

   display(df.info())
   print("\n")

   return df, dframe


# Descrive il data frame
def describe_df(df, dframe=0):

   print("DESCRIVE I DATA FRAME", "\n")

   display(df.describe())
   print("\n")

   return df, dframe

Il codice seguente finalizzerà queste tre attività:

# Mostra il data frame cuDF e le informazioni
_, _ = display_df(df_cudf, dframe=1)
_, _ = display_info(df_cudf, dframe=1)
_, _ = describe_df(df_cudf, dframe=1)

A questo punto abbiamo finito con l’ingegneria delle feature.

Esecuzione singola

In sintesi, i nostri sforzi di ingegneria delle feature si sono concentrati sui seguenti compiti:

  1. Caricamento dei data frame
  2. Unione dei data frame
  3. Cambiamento del tipo di dato
  4. Ottenimento di time_id unici
  5. Verifica dei valori nulli
  6. Aggiunta di colonne
  7. Rimozione di colonne
  8. Calcolo di statistiche
  9. Calcolo di una correlazione
  10. Rinomina delle colonne
  11. Divisione di una colonna in fasce
  12. Visualizzazione dei data frame
  13. Visualizzazione delle informazioni sui data frame
  14. Descrizione dei data frame

Sono stati 13 i compiti, ma abbiamo menzionato “Calcolo di una correlazione” come entità separata qui. Ora vogliamo eseguire questi compiti in sequenza in una singola esecuzione, come mostrato di seguito:

def run_and_report():

   # Crea un dizionario per memorizzare i tempi trascorsi
   time_dict = defaultdict(list)

   # Elenco delle operazioni da eseguire
   labels = ["changing_dtype",
             "getting_unique_timeids",
             "checking_null_values",
             "adding_column",
             "dropping_column",
             "calculating_agg_stats",
             "merging_dataframes",
             "renaming_columns",
             "binning_col",
             "calculating_corr",
             "displaying_dfs",
             "displaying_info",
             "describing_dfs"]

   # Carica il data frame degli ordini di Pandas e calcola il tempo
   df_pd_order, pd_order_loading_time = load_dataframe(order_files, dframe=0)
   print("-"*150, "\n")

   # Carica il data frame degli ordini cuDF e calcola il tempo
   df_cudf_order, cudf_order_loading_time = load_dataframe(order_files, dframe=1)
   print("-"*150, "\n")

   # Carica il data frame degli scambi di Pandas e calcola il tempo
   df_pd_trade, pd_trade_loading_time = load_dataframe(trade_files, dframe=0)
   print("-"*150, "\n")

   # Carica il data frame degli scambi cuDF e calcola il tempo
   df_cudf_trade, cudf_trade_loading_time = load_dataframe(trade_files, dframe=1)
   print("-"*150, "\n")

   # Ottieni time_id dal data frame di Pandas
   _, time_ids = get_unique_timeids(df_pd_order, dframe=0)
   print("-"*150, "\n")

   # Ottieni time_id dal data frame cuDF
   _, time_ids = get_unique_timeids(df_cudf_order, dframe=1)
   print("-"*150, "\n")

   # Archivia i tempi di caricamento
   time_dict["loading_dfs"].extend([pd_order_loading_time, cudf_order_loading_time])

   # Forward fill per i data frame degli ordini
   expanded_df_pd_order, expanded_df_cudf_order = ffill(df_pd_order, df_name="order")

   # Forward fill per i data frame degli scambi
   expanded_df_pd_trade, expanded_df_cudf_trade = ffill(df_pd_trade, df_name="trade")

   # Unisci i data frame degli ordini e degli scambi di Pandas
   df_pd, pd_merge_time = merge_dataframes(expanded_df_pd_order, expanded_df_pd_trade, dframe=0)
   print("-"*150, "\n")

   # Unisci i data frame degli ordini e degli scambi di cuDF
   df_cudf, cudf_merge_time = merge_dataframes(expanded_df_cudf_order, expanded_df_cudf_trade, dframe=1)
   print("-"*150, "\n")

   # Archivia i tempi di unione
   time_dict["merging_dfs"].extend([pd_merge_time, cudf_merge_time])

   # Applica le funzioni
   functions = [change_dtype,
                get_unique_timeids,
                check_null_values,
                add_column,
                drop_column,
                calc_agg_stats,
                merge_dataframes_2,
                rename_cols,
                bin_col,
                calc_corr,
                display_df,
                display_info,
                describe_df]

   for label, function in enumerate(functions):

     # Funzione per Pandas
     start_pd = time.time()
     df_pd, x = function(df_pd, dframe=0)
     end_pd = time.time()
     elapsed_time_for_pd = round(end_pd-start_pd, 3)
     print(f"Per il data frame di Pandas: \n tempo di inizio: {start_pd} \n tempo di fine: {end_pd} \n tempo trascorso: {elapsed_time_for_pd} \n")     

     # Funzione per cuDF
     start_cudf = time.time()
     df_cudf, x = function(df_cudf, dframe=1)
     end_cudf = time.time()
     elapsed_time_for_cudf = round(end_cudf-start_cudf, 3)
     print(f"Per il data frame cuDF: \n tempo di inizio: {start_cudf} \n tempo di fine: {end_cudf} \n tempo trascorso: {elapsed_time_for_cudf} \n")
     print("-"*150, "\n")

     # Archivia i tempi trascorsi
     time_dict[labels[label]].extend([elapsed_time_for_pd, elapsed_time_for_cudf])

   # Elimina la durata del tempo non richiesta
   del time_dict["merging_dataframes"]
   labels.remove("merging_dataframes")
   labels.insert(0, "merging_dfs")
   labels.insert(0, "loading_dfs")

   print(time_dict)

   return time_dict, labels, df_pd, df_cudf

La funzione run_and_report darà gli stessi output di prima ma in un report completo tramite un singolo comando di esecuzione. Eseguirà le 14 attività sia su Pandas che su cuDF e registrerà i tempi impiegati per entrambi i data frame.

time_dict, labels, df_pd, df_cudf = run_and_report()

Potremmo dover eseguire più cicli per vedere più chiaramente la differenza di prestazioni tra le due librerie di dati.

Valutazione Finale

Se eseguiamo la funzione run_and_report più volte, ad esempio in cicli, possiamo avere una migliore comprensione della differenza di prestazioni tra Pandas e cuDF. Quindi impostiamo i cicli a 30. Poi registriamo tutte le durate di tempo per ogni operazione, ciclo e libreria di dati e valutiamo infine i risultati:

def calc_exec_times():

   exec_times_by_round = {}

   # Calcola i tempi di esecuzione delle operazioni in ogni ciclo
   for round_no in range(1, ROUNDS+1):
     # cycle_no += 1
     time_dict, labels, df_pd, df_cudf = run_and_report()
     exec_times_by_round[round_no] = time_dict

   print("exec_times_by_round: ", exec_times_by_round)

   # Ottieni le durate per operazione per ogni data frame
   pd_summary, cudf_summary = get_statistics(exec_times_by_round, labels)

   # Ottieni le durate per ciclo per ogni data frame
   round_total = get_total(exec_times_by_round)
   print("\n"*3)

   # Grafica delle durate
   plt.style.use('dark_background')
   X_axis = np.arange(len(labels))

   # Grafica della durata media dell'operazione
   plot_avg_by_df(pd_summary, cudf_summary, labels, X_axis)
   print("\n"*3)

   # Grafica della durata totale e della differenza per operazione
   plot_diff_by_df(pd_summary, cudf_summary, labels)
   print("\n"*3)

   # Grafica della durata totale e della differenza per ciclo
   plot_total_by_df(round_total)
   print("\n"*3)

La funzione calc_exec_times esegue alcune attività. Innanzitutto chiama get_statistics per ottenere “la durata media e totale per operazione” per ogni libreria di dati in 30 cicli.

def get_statistics(exec_times_by_round, labels):

   # Separa e memorizza le statistiche di durata per data frame
   pd_performance = defaultdict(list)
   cudf_performance = defaultdict(list)

   # Ottieni e memorizza le durate per ogni operazione per data frame
   for label in labels:
     for key, values in exec_times_by_round.items():

       pd_performance[label].append(values[label][0])
       cudf_performance[label].append(values[label][1])

   print("pd_performance: ", pd_performance)
   print("cudf_performance: ", cudf_performance)

   # Calcola le durate medie e totali per ogni operazione per data frame
   pd_summary = {key: [round(sum(value), 3), round(np.average(value), 3)] for key, value in pd_performance.items()}
   cudf_summary = {key: [round(sum(value), 3), round(np.average(value), 3)] for key, value in cudf_performance.items()}

   print("pd_summary: ", pd_summary)
   print("cudf_summary: ", cudf_summary) 

   return pd_summary, cudf_summary

Successivamente, calcola la “durata totale per ciclo” per ogni libreria di dati.

def get_total(exec_times_by_round):

   def get_round_total(stat_list):

     # Ottieni la durata totale per ciclo per ogni data frame
     pd_round_total = round(sum([x[0] for x in stat_list]), 3)
     cudf_round_total = round(sum([x[1] for x in stat_list]), 3)

     return pd_round_total, cudf_round_total

   # Raccogli le durate totali per ciclo
   for key, value in exec_times_by_round.items():
     round_total = {key: get_round_total(list(value.values())) for key, value in exec_times_by_round.items()}

   print("round_total", round_total)

   return round_total

Infine, grafica i risultati. Qui, il primo grafico riguarda “la durata media per operazione” per entrambe le librerie.

def plot_avg_by_df(pd_summary, cudf_summary, labels, X_axis):

   # Dimensioni della figura
   fig = plt.subplots(figsize =(10, 4))

   # Durata media per operazione per ogni data frame
   pd_avg = [value[1] for key, value in pd_summary.items()]
   cudf_avg = [value[1] for key, value in cudf_summary.items()]

   plt.bar(X_axis - 0.2, pd_avg, 0.4, color = '#5A5AAF', label = 'pandas', align='center')
   plt.bar(X_axis + 0.2, cudf_avg, 0.4, color = '#C8C8FF', label = 'cuDF', align='center')

   plt.xticks(X_axis, labels, fontsize=9, rotation=90)
   plt.yticks(fontsize=9)
   plt.xlabel("Operazioni", fontsize=10)
   plt.ylabel("Durata Media in Secondi", fontsize=10)
   plt.grid(axis='y', color="#E4E4E4", alpha=0.5)
   plt.title("Durata Media delle Operazioni per Data Frame", fontsize=12)
   plt.legend()
   plt.show()

Esposizione-19: Durata Media Per Operazione Per il Data Frame di Pandas e cuDF (Immagine dell’Autore)

Il secondo grafico è per la “durata totale per operazione”, che mostra il tempo totale impiegato per ogni operazione su tutte e 30 le iterazioni.

def plot_diff_by_df(pd_summary, cudf_summary, labels):

   # Dimensioni della figura
   fig = plt.subplots(figsize =(12, 6))

   # Durata totale per operazione per ogni data frame
   pd_total = [value[0] for key, value in pd_summary.items()]
   cudf_total = [value[0] for key, value in cudf_summary.items()]

   # Differenza di durata totale per operazione per ogni data frame
   diff = [x[0]-x[1] for x in zip(pd_total, cudf_total)]

   # Imposta la larghezza delle barre
   barWidth = 0.25

   # Imposta la posizione delle barre sull'asse X
   br1 = np.arange(len(labels))
   br2 = [x + barWidth for x in br1]
   br3 = [x + barWidth for x in br2]

   plt.bar(br1, pd_total, barWidth, color = '#5A5AAF', label = 'pandas', align='center')
   plt.bar(br2, cudf_total, barWidth, color = '#C8C8FF', label = 'cuDF', align='center')
   plt.bar(br3, diff, barWidth, color = '#AA1E1E', label = 'differenza', align='center')

   plt.xticks([r + barWidth for r in range(len(labels))], labels, fontsize=9, rotation=90)
   plt.yticks(fontsize=9)
   plt.xlabel("Operazioni", fontsize=10)
   plt.ylabel("Durata Totale in Secondi", fontsize=10)
   plt.grid(axis='y', color="#E4E4E4", alpha=0.5)
   plt.title("Durata Totale delle Operazioni per il Data Frame", fontsize=12)
   plt.legend()
   plt.show()

Esposizione-20: Durata Totale Per Operazione in 30 Iterazioni Per il Data Frame di Pandas e cuDF (Immagine dell’Autore)

Il grafico finale è “durata totale per iterazione”, che mostra il tempo totale impiegato da tutte le operazioni per ogni iterazione.

def plot_total_by_df(round_total):

    # Dimensioni della figura
   fig = plt.subplots(figsize =(10, 6))

   X_axis = np.arange(1, ROUNDS+1)

   # Durata totale per operazione per ogni iterazione per ogni data frame
   pd_round_total = [value[0] for key, value in round_total.items()]
   cudf_round_total = [value[1] for key, value in round_total.items()]

   # Differenza di durata totale per operazione per ogni iterazione per ogni data frame
   diff = [x[0]-x[1] for x in zip(pd_round_total, cudf_round_total)]

   plt.plot(X_axis, pd_round_total, linestyle="-", linewidth=3, color = '#5A5AAF', label = "pandas")
   plt.plot(X_axis, cudf_round_total, linestyle="-", linewidth=3, color = '#B0B05A', label = "cuDF")
   plt.plot(X_axis, diff, linestyle="--", linewidth=3, color = '#AA1E1E', label = "differenza")

   plt.xticks(X_axis, fontsize=9)
   plt.yticks(fontsize=9)
   plt.xlabel("Iterazioni", fontsize=10)
   plt.ylabel("Durata Totale in Secondi", fontsize=10)
   plt.grid(axis='y', color="#E4E4E4", alpha=0.5)
   plt.title("Durata Totale per Iterazione", fontsize=12)
   plt.legend()
   plt.show()

Esposizione-21: Durata Totale di Tutte le Operazioni per Ogni Iterazione per il Data Frame di Pandas e cuDF (Immagine dell’Autore)

Anche se non abbiamo coperto tutte le operazioni di feature engineering eseguite sul dataset, sono le stesse o simili a quelle mostrate qui. Spiegando 14 operazioni individualmente, abbiamo cercato di documentare le prestazioni relative del data frame di Pandas e cuDF e di consentire la riproducibilità.

In tutti i casi tranne il calcolo della correlazione e la visualizzazione del data frame, cuDF supera Pandas. Questa superiorità di prestazioni diventa più evidente in compiti complessi come groupby, merge, agg e describe. Un altro punto è che il data frame di Pandas si stanca nel tempo quando aumentano le iterazioni, mentre cuDF segue un pattern più stabile.

Ricordiamo che abbiamo esaminato solo un’azione come esempio. Se elaboriamo tutti i 112 titoli, possiamo aspettarci un divario di prestazioni più ampio a favore di cuDF. Se la popolazione delle azioni sale a centinaia, le prestazioni di cuDF possono addirittura essere più drammatiche. Nel caso dei big data, dove l’esecuzione di attività parallele è possibile, un framework distribuito come Dask-cuDF, che estende l’elaborazione parallela ai DataFrame GPU di cuDF, può essere lo strumento giusto.

Riferimenti

[1] Definizione di RAPIDS, https://www.heavy.ai/technical-glossary/rapids

[2] 10 minuti per cuDF e Dask-cuDF, https://docs.rapids.ai/api/cudf/stable/user_guide/10min/

[3] Optiver Realized Volatility Prediction, https://www.kaggle.com/competitions/optiver-realized-volatility-prediction/data

[4] Inserimento di dati di un libro con forward filling, https://www.kaggle.com/competitions/optiver-realized-volatility-prediction/discussion/251277 Hasan Serdar Altan è un data scientist e un AWS Cloud Architect Associato.