Previsione multivariata delle serie temporali probabilistiche con Informer
'Previsione multivariata delle serie temporali con Informer'
Introduzione
Qualche mese fa abbiamo introdotto il Time Series Transformer, che è il Transformer di base (Vaswani et al., 2017) applicato alla previsione e abbiamo mostrato un esempio per il compito di previsione probabilistica univariata (cioè prevedere singolarmente la distribuzione a 1 dimensione di ogni serie temporale). In questo post introduciamo il modello Informer (Zhou, Haoyi, et al., 2021), il miglior paper di AAAI21, che è ora disponibile in 🤗 Transformers. Mostreremo come utilizzare il modello Informer per il compito di previsione probabilistica multivariata, ossia prevedere la distribuzione di un vettore futuro di valori target di serie temporali. Si noti che questo funzionerà anche per il modello vanilla Time Series Transformer.
Previsione probabilistica multivariata di serie temporali
Per quanto riguarda l’aspetto modellistico della previsione probabilistica, il Transformer/Informer non richiederà alcuna modifica quando si tratta di serie temporali multivariate. Sia nel contesto univariato che multivariato, il modello riceverà una sequenza di vettori e quindi l’unica differenza sarà sul lato di output o di emissione.
Modellare la distribuzione condizionale congiunta completa di dati ad alta dimensionalità può comportare un costo computazionale elevato e quindi i metodi ricorrono a qualche approssimazione della distribuzione, la più semplice delle quali è modellare i dati come una distribuzione indipendente della stessa famiglia, o una qualche approssimazione a basso rango della covarianza completa, ecc. Qui invece ci limiteremo alle emissioni indipendenti (o diagonali) che sono supportate per le famiglie di distribuzioni che abbiamo implementato qui.
- Ethics and Society Newsletter #3 Apertura Etica da Hugging Face
- StackLLaMA Una guida pratica per addestrare LLaMA con RLHF
- Creazione di un Assistente di Codifica con StarCoder
Informer – Nel dettaglio
Basandosi sul Transformer di base (Vaswani et al., 2017), Informer utilizza due miglioramenti principali. Per comprendere questi miglioramenti, ricordiamo gli svantaggi del Transformer di base:
- Calcolo quadratico dell’attenzione canonica interna: Il Transformer di base ha una complessità computazionale di O(T^2 D), dove T è la lunghezza della serie temporale e D è la dimensione degli stati nascosti. Per la previsione di sequenze temporali lunghe (nota anche come problema LSTF), ciò potrebbe essere davvero computazionalmente costoso. Per risolvere questo problema, Informer utilizza un nuovo meccanismo di attenzione interna chiamato ProbSparse attention, che ha una complessità temporale e spaziale di O(T log T).
- Bottleneck di memoria durante l’impilamento dei layer: Quando si impilano N layer di codificatori/decodificatori, il Transformer di base ha un utilizzo di memoria di O(N T^2), il che limita la capacità del modello per sequenze lunghe. Informer utilizza un’operazione di Distilling per ridurre la dimensione dell’input tra i layer a metà. In questo modo, riduce l’utilizzo della memoria complessiva a O(N T log T).
Come si può vedere, la motivazione per il modello Informer è simile a Longformer (Beltagy et al., 2020), Sparse Transformer (Child et al., 2019) e altri paper NLP per ridurre la complessità quadratica del meccanismo di attenzione interna quando la sequenza di input è lunga. Adesso, immergiamoci nell’attenzione ProbSparse e nell’operazione di Distilling con esempi di codice.
Attenzione ProbSparse
L’idea principale di ProbSparse è che i punteggi di attenzione interna formano una distribuzione a lunga coda, in cui le query “attive” si trovano nei punteggi “head” e le query “pigre” si trovano nell’area “tail”. Per “query attiva” si intende una query q_i che contribuisce all’attenzione principale, mentre una query “pigra” forma un prodotto scalare che genera un’attenzione banale. Qui, q_i e k_i sono le righe i-esime nelle matrici di attenzione Q e K rispettivamente.
Dato l’idea di query “attive” e “pigre”, l’attenzione ProbSparse seleziona le query “attive” e crea una matrice di query ridotta Q_r_e_d_u_c_e_d che viene utilizzata per calcolare i pesi di attenzione in O(T log T). Vediamo questo più in dettaglio con un esempio di codice.
Ricordiamo la formula canonica dell’auto-attenzione:
Attention ( Q , K , V ) = softmax ( Q K T d k ) V \textrm{Attention}(Q, K, V) = \textrm{softmax}(\frac{QK^T}{\sqrt{d_k}} )V Attention ( Q , K , V ) = softmax ( d k Q K T ) V
Dove Q ∈ R L Q × d Q\in \mathbb{R}^{L_Q \times d} Q ∈ R L Q × d , K ∈ R L K × d K\in \mathbb{R}^{L_K \times d} K ∈ R L K × d e V ∈ R L V × d V\in \mathbb{R}^{L_V \times d} V ∈ R L V × d . Nota che nella pratica, la lunghezza di input delle query e delle chiavi sono tipicamente equivalenti nel calcolo dell’auto-attenzione, cioè L Q = L K = T L_Q = L_K = T L Q = L K = T dove T T T è la lunghezza della serie temporale. Pertanto, la moltiplicazione Q K T QK^T Q K T richiede una complessità computazionale di O ( T 2 ⋅ d ) O(T^2 \cdot d) O ( T 2 ⋅ d ) . Nell’auto-attenzione ProbSparse, il nostro obiettivo è creare una nuova matrice Q r e d u c e Q_{reduce} Q r e d u c e e definire:
ProbSparseAttention ( Q , K , V ) = softmax ( Q r e d u c e K T d k ) V \textrm{ProbSparseAttention}(Q, K, V) = \textrm{softmax}(\frac{Q_{reduce}K^T}{\sqrt{d_k}} )V ProbSparseAttention ( Q , K , V ) = softmax ( d k Q r e d u c e K T ) V
dove la matrice Q r e d u c e Q_{reduce} Q r e d u c e seleziona solo le prime u u u query “attive”. Qui, u = c ⋅ log L Q u = c \cdot \log L_Q u = c ⋅ lo g L Q e c c c è chiamato il fattore di campionamento iperparametro per l’auto-attenzione ProbSparse. Dal momento che Q r e d u c e Q_{reduce} Q r e d u c e seleziona solo le prime u u u query, la sua dimensione è c ⋅ log L Q × d c\cdot \log L_Q \times d c ⋅ lo g L Q × d , quindi la moltiplicazione Q r e d u c e K T Q_{reduce}K^T Q r e d u c e K T richiede solo O ( L K log L Q ) = O ( T log T ) O(L_K \log L_Q) = O(T \log T) O ( L K lo g L Q ) = O ( T lo g T ) .
Questo è buono! Ma come possiamo selezionare le prime u u u query “attive” per creare Q r e d u c e Q_{reduce} Q r e d u c e ? Definiamo la Misura di Sparsità delle Query.
Misura di Sparsità delle Query
La Misura di Sparsità delle Query M ( q i , K ) M(q_i, K) M ( q i , K ) viene utilizzata per selezionare le prime u u u query “attive” q i q_i q i in Q Q Q per creare Q r e d u c e Q_{reduce} Q r e d u c e . In teoria, le coppie dominanti ⟨ q i , k i ⟩ \langle q_i,k_i \rangle ⟨ q i , k i ⟩ incoraggiano la distribuzione di probabilità delle q i q_i q i “attive” a deviare dalla distribuzione uniforme come si può vedere nella figura qui sotto. Pertanto, la divergenza di Kullback-Leibler tra la distribuzione reale delle query e la distribuzione uniforme viene utilizzata per definire la misura di sparsità.
Nella pratica, la misura è definita come:
M ( q i , K ) = max j q i k j T d − 1 L k ∑ j = 1 L k q i k j T d M(q_i, K) = \max_j \frac{q_ik_j^T}{\sqrt{d}}-\frac{1}{L_k} \sum_{j=1}^{L_k}\frac{q_ik_j^T}{\sqrt{d}} M ( q i , K ) = j max d q i k j T − L k 1 j = 1 ∑ L k d q i k j T
La cosa importante da capire qui è quando M ( q i , K ) M(q_i, K) M ( q i , K ) è più grande, la query q i q_i q i dovrebbe essere in Q r e d u c e Q_{reduce} Q r e d u c e e viceversa.
Ma come possiamo calcolare il termine q i k j T q_ik_j^T q i k j T in tempo non quadratico? Ricorda che la maggior parte dei prodotti scalari ⟨ q i , k i ⟩ \langle q_i,k_i \rangle ⟨ q i , k i ⟩ generano in entrambi i modi l’attenzione banale (cioè la proprietà di distribuzione a lunga coda), quindi è sufficiente campionare casualmente un sottoinsieme di chiavi da K K K , che verrà chiamato K_sample
nel codice.
Ora, siamo pronti a vedere il codice di probsparse_attention
:
from torch import nn
import math
def probsparse_attention(query_states, key_states, value_states, sampling_factor=5):
"""
Calcola l'attenzione self-attention sparsa delle probabilità.
Formato di input: Batch x Tempo x Canale
Notare l'input aggiuntivo `sampling_factor`.
"""
# ottieni le dimensioni degli input con i logaritmi
L_K = key_states.size(1)
L_Q = query_states.size(1)
log_L_K = np.ceil(np.log1p(L_K)).astype("int").item()
log_L_Q = np.ceil(np.log1p(L_Q)).astype("int").item()
# calcola un sottoinsieme di campioni da tagliare da K e crea Q_K_sample
U_part = min(sampling_factor * L_Q * log_L_K, L_K)
# crea Q_K_sample (il termine q_i * k_j^T nella misurazione della sparsità)
index_sample = torch.randint(0, L_K, (U_part,))
K_sample = key_states[:, index_sample, :]
Q_K_sample = torch.bmm(query_states, K_sample.transpose(1, 2))
# calcola la misurazione di sparsità della query con Q_K_sample
M = Q_K_sample.max(dim=-1)[0] - torch.div(Q_K_sample.sum(dim=-1), L_K)
# calcola u per trovare le query Top-u in base alla misurazione di sparsità
u = min(sampling_factor * log_L_Q, L_Q)
M_top = M.topk(u, sorted=False)[1]
# calcola Q_reduce come query_states[:, M_top]
dim_for_slice = torch.arange(query_states.size(0)).unsqueeze(-1)
Q_reduce = query_states[dim_for_slice, M_top] # dimensione: c*log_L_Q x canale
# e ora, come il canonical
d_k = query_states.size(-1)
attn_scores = torch.bmm(Q_reduce, key_states.transpose(-2, -1)) # Q_reduce x K^T
attn_scores = attn_scores / math.sqrt(d_k)
attn_probs = nn.functional.softmax(attn_scores, dim=-1)
attn_output = torch.bmm(attn_probs, value_states)
return attn_output, attn_scores
Nota che nell’implementazione, U p a r t U_{part} U p a r t contiene L Q L_Q L Q nel calcolo, per problemi di stabilità (vedi questa discussione per ulteriori informazioni).
Ci siamo riusciti! Si prega di notare che questa è solo un’implementazione parziale di probsparse_attention
, e l’implementazione completa si trova in 🤗 Transformers.
Distillazione
A causa della self-attention ProbSparse, la mappa delle caratteristiche dell’encoder ha una certa ridondanza che può essere rimossa. Pertanto, l’operazione di distillazione viene utilizzata per ridurre la dimensione di input tra i livelli dell’encoder a metà, eliminando così questa ridondanza in teoria. Nella pratica, l’operazione “distillazione” di Informer aggiunge semplicemente strati di convoluzione 1D con max pooling tra ciascuno dei livelli dell’encoder. Sia X n X_n X n l’output del n -esimo livello dell’encoder, l’operazione di distillazione è quindi definita come:
X n + 1 = MaxPool ( ELU ( Conv1d ( X n ) ) X_{n+1} = \textrm{MaxPool} ( \textrm{ELU}(\textrm{Conv1d}(X_n)) X n + 1 = MaxPool ( ELU ( Conv1d ( X n ) )
Vediamo questo nel codice:
from torch import nn
# ConvLayer è una classe con un passaggio in avanti che applica ELU e MaxPool1d
def informer_encoder_forward(x_input, num_encoder_layers=3, distil=True):
# Inizializza gli strati di convoluzione
if distil:
conv_layers = nn.ModuleList([ConvLayer() per _ in range(num_encoder_layers - 1)])
conv_layers.append(None)
else:
conv_layers = [None] * num_encoder_layers
# Applica conv_layer tra ogni encoder_layer
per encoder_layer, conv_layer in zip(encoder_layers, conv_layers):
output = encoder_layer(x_input)
if conv_layer is not None:
output = conv_layer(loutput)
return output
Riducendo l’input di ogni livello di due, otteniamo un utilizzo della memoria di O ( N ⋅ T log T ) invece di O ( N ⋅ T 2 ) dove N è il numero di livelli dell’encoder/decoder. Questo è ciò che volevamo!
Il modello Informer è ora disponibile nella libreria 🤗 Transformers, e viene semplicemente chiamato InformerModel
. Nelle sezioni seguenti, mostreremo come addestrare questo modello su un dataset multivariato di serie temporali personalizzato.
Configurare l’ambiente
Prima di tutto, installiamo le librerie necessarie: 🤗 Transformers, 🤗 Datasets, 🤗 Evaluate, 🤗 Accelerate e GluonTS.
Come mostreremo, GluonTS verrà utilizzato per trasformare i dati e creare le feature, nonché per creare i batch di addestramento, validazione e test appropriati.
!pip install -q transformers datasets evaluate accelerate gluonts ujson
Caricare il dataset
In questo post, utilizzeremo il dataset traffic_hourly
, che è disponibile su Hugging Face Hub. Questo dataset contiene il dataset sul traffico di San Francisco utilizzato da Lai et al. (2017). Contiene 862 serie temporali orarie che mostrano i tassi di occupazione stradale nell’intervallo [0, 1] nell’area delle autostrade della baia di San Francisco dal 2015 al 2016.
Questo dataset fa parte del repository Monash Time Series Forecasting, una collezione di dataset di serie temporali provenienti da diversi domini. Può essere considerato il benchmark GLUE per la previsione delle serie temporali.
from datasets import load_dataset
dataset = load_dataset("monash_tsf", "traffic_hourly")
Come si può vedere, il dataset contiene 3 divisioni: train, validation e test.
dataset
>>> DatasetDict({
train: Dataset({
features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
num_rows: 862
})
test: Dataset({
features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
num_rows: 862
})
validation: Dataset({
features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
num_rows: 862
})
})
Ogni esempio contiene alcune chiavi, di cui start
e target
sono le più importanti. Diamo un’occhiata alla prima serie temporale nel dataset:
train_example = dataset["train"][0]
train_example.keys()
>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])
Il start
indica semplicemente l’inizio della serie temporale (come una data e ora), e il target
contiene i valori effettivi della serie temporale.
Il start
sarà utile per aggiungere al valore della serie temporale delle feature relative al tempo, come input aggiuntivo per il modello (come ad esempio “mese dell’anno”). Dal momento che conosciamo la frequenza dei dati, che è hourly
, sappiamo ad esempio che il secondo valore ha il timestamp 2015-01-01 01:00:01
, 2015-01-01 02:00:01
, ecc.
print(train_example["start"])
print(len(train_example["target"]))
>>> 2015-01-01 00:00:01
17448
Il set di validazione contiene gli stessi dati del set di addestramento, ma per un periodo di tempo più lungo pari a prediction_length
. Ciò ci permette di confrontare le previsioni del modello con la verità di riferimento.
Il set di test è ancora più lungo rispetto al set di validazione, di una lunghezza pari a prediction_length
(o di una lunghezza pari a un multiplo di prediction_length
per il test su più finestre mobili).
validation_example = dataset["validation"][0]
validation_example.keys()
>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])
I valori iniziali sono esattamente gli stessi dell’esempio di allenamento corrispondente. Tuttavia, questo esempio ha prediction_length=48
(48 ore, o 2 giorni) valori aggiuntivi rispetto all’esempio di allenamento. Verifichiamolo.
freq = "1H"
prediction_length = 48
assert len(train_example["target"]) + prediction_length == len(
dataset["validation"][0]["target"]
)
Visualizziamolo:
import matplotlib.pyplot as plt
num_of_samples = 150
figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
validation_example["target"][-num_of_samples - prediction_length :],
color="red",
alpha=0.5,
)
plt.show()
Suddividiamo i dati:
train_dataset = dataset["train"]
test_dataset = dataset["test"]
Aggiornare start
a pd.Period
La prima cosa che faremo è convertire la caratteristica start
di ogni serie temporale in un indice di pandas Period
utilizzando la frequenza dei dati freq
:
from functools import lru_cache
import pandas as pd
import numpy as np
@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
return pd.Period(date, freq)
def transform_start_field(batch, freq):
batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
return batch
Ora utilizziamo la funzionalità ‘set_transform’ di ‘datasets’ per farlo in modo dinamico sul posto:
from functools import partial
train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))
Ora convertiamo il dataset in una serie temporale multivariata utilizzando il MultivariateGrouper
di GluonTS. Questo raggruppatore convertirà le singole serie temporali unidimensionali in una singola matrice bidimensionale.
from gluonts.dataset.multivariate_grouper import MultivariateGrouper
num_of_variates = len(train_dataset)
train_grouper = MultivariateGrouper(max_target_dim=num_of_variates)
test_grouper = MultivariateGrouper(
max_target_dim=num_of_variates,
num_test_dates=len(test_dataset) // num_of_variates, # numero di finestre di test in scorrimento
)
multi_variate_train_dataset = train_grouper(train_dataset)
multi_variate_test_dataset = test_grouper(test_dataset)
Si noti che il target è ora bidimensionale, dove la prima dimensione è il numero di variabili (numero di serie temporali) e la seconda è il valore delle serie temporali (dimensione temporale):
multi_variate_train_example = multi_variate_train_dataset[0]
print("multi_variate_train_example["target"].shape =", multi_variate_train_example["target"].shape)
>>> multi_variate_train_example["target"].shape = (862, 17448)
Definire il Modello
Successivamente, istanziamo un modello. Il modello verrà addestrato da zero, quindi non useremo il metodo from_pretrained
qui, ma inizieremo il modello in modo casuale da una config
.
Specifichiamo alcuni parametri aggiuntivi per il modello:
prediction_length
(nel nostro caso,48
ore): questo è l’orizzonte che il decodificatore dell’Informer imparerà a prevedere;context_length
: il modello imposterà ilcontext_length
(input dell’encoder) uguale alprediction_length
, se non viene specificato alcuncontext_length
;lags
per una data frequenza: questi specificano un efficiente meccanismo di “look back”, in cui concateniamo i valori dal passato ai valori attuali come ulteriori caratteristiche, ad esempio per una frequenzaDaily
potremmo considerare un look back di[1, 7, 30, ...]
o per datiMinute
potremmo considerare[1, 30, 60, 60*24, ...]
ecc.;- il numero di caratteristiche temporali: nel nostro caso, saranno
5
poiché aggiungeremo le caratteristicheHourOfDay
,DayOfWeek
, …, eAge
(vedi sotto).
Verifichiamo i ritardi predefiniti forniti da GluonTS per la frequenza data (“oraria”):
from gluonts.time_feature import get_lags_for_frequency
lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)
>>> [1, 2, 3, 4, 5, 6, 7, 23, 24, 25, 47, 48, 49, 71, 72, 73, 95, 96, 97, 119, 120,
121, 143, 144, 145, 167, 168, 169, 335, 336, 337, 503, 504, 505, 671, 672, 673, 719, 720, 721]
Questo significa che guarderà indietro fino a 721 ore (~30 giorni) per ogni passo temporale, come caratteristiche aggiuntive. Tuttavia, il vettore delle caratteristiche risultante avrà una dimensione len(lags_sequence)*num_of_variates
che nel nostro caso sarà 34480! Questo non funzionerà, quindi useremo i nostri ritardi sensibili.
Verifichiamo anche le caratteristiche temporali predefinite fornite da GluonTS:
from gluonts.time_feature import time_features_from_frequency_str
time_features = time_features_from_frequency_str(freq)
print(time_features)
>>> [<function hour_of_day at 0x7f3809539240>, <function day_of_week at 0x7f3809539360>, <function day_of_month at 0x7f3809539480>, <function day_of_year at 0x7f38095395a0>]
In questo caso, ci sono quattro caratteristiche aggiuntive, ovvero “ora del giorno”, “giorno della settimana”, “giorno del mese” e “giorno dell’anno”. Questo significa che per ogni passo temporale, aggiungeremo queste caratteristiche come valori scalari. Ad esempio, consideriamo il timestamp 2015-01-01 01:00:01
. Le quattro caratteristiche aggiuntive saranno:
from pandas.core.arrays.period import period_array
timestamp = pd.Period("2015-01-01 01:00:01", freq=freq)
timestamp_as_index = pd.PeriodIndex(data=period_array([timestamp]))
additional_features = [
(time_feature.__name__, time_feature(timestamp_as_index))
for time_feature in time_features
]
print(dict(additional_features))
>>> {'hour_of_day': array([-0.45652174]), 'day_of_week': array([0.]), 'day_of_month': array([-0.5]), 'day_of_year': array([-0.5])}
Si noti che le ore e i giorni sono codificati come valori compresi tra [-0.5, 0.5]
da GluonTS. Per ulteriori informazioni sulle time_features
, consultare questo . Oltre a queste 4 caratteristiche, aggiungeremo anche una caratteristica “età” come vedremo più avanti nelle trasformazioni dei dati.
Ora abbiamo tutto per definire il modello:
from transformers import InformerConfig, InformerForPrediction
config = InformerConfig(
# nel contesto multivariato, input_size è il numero di variabili nella serie temporale per passo temporale
input_size=num_of_variates,
# lunghezza della previsione:
prediction_length=prediction_length,
# lunghezza del contesto:
context_length=prediction_length * 2,
# valore dei ritardi copiato da 1 settimana prima:
lags_sequence=[1, 24 * 7],
# aggiungeremo 5 caratteristiche temporali ("hour_of_day", ..., e "età"):
num_time_features=len(time_features) + 1,
# parametri di Informer:
dropout=0.1,
encoder_layers=6,
decoder_layers=4,
# proietta l'input da num_of_variates*len(lags_sequence)+num_time_features a:
d_model=64,
)
model = InformerForPrediction(config)
Per impostazione predefinita, il modello utilizza una distribuzione Student-t diagonale (ma questo è configurabile):
model.config.distribution_output
>>> 'student_t'
Definire le Trasformazioni
Successivamente, definiamo le trasformazioni per i dati, in particolare per la creazione delle caratteristiche temporali (basate sul dataset o universali).
Di nuovo, useremo la libreria GluonTS per questo. Definiamo una Chain
di trasformazioni (che è un po’ paragonabile a torchvision.transforms.Compose
per le immagini). Ci permette di combinare diverse trasformazioni in un unico flusso di lavoro.
from gluonts.time_feature import TimeFeature
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
AddAgeFeature,
AddObservedValuesIndicator,
AddTimeFeatures,
AsNumpyArray,
Chain,
ExpectedNumInstanceSampler,
InstanceSplitter,
RemoveFields,
SelectFields,
SetField,
TestSplitSampler,
Transformation,
ValidationSplitSampler,
VstackFeatures,
RenameFields,
)
Le trasformazioni di seguito sono annotate con commenti per spiegare cosa fanno. A livello generale, itereremo sulle singole serie temporali del nostro dataset e aggiungeremo/rimuoveremo campi o caratteristiche:
from transformers import PretrainedConfig
def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
# creiamo una lista di campi da rimuovere in seguito
remove_field_names = []
if config.num_static_real_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if config.num_dynamic_real_features == 0:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
if config.num_static_categorical_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_CAT)
return Chain(
# passo 1: rimuovi campi statici/dinamici se non specificati
[RemoveFields(field_names=remove_field_names)]
# passo 2: converi i dati in NumPy (potenzialmente non necessario)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT,
expected_ndim=1,
dtype=int,
)
]
if config.num_static_categorical_features > 0
else []
)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
)
]
if config.num_static_real_features > 0
else []
)
+ [
AsNumpyArray(
field=FieldName.TARGET,
# ci aspettiamo una dimensione aggiuntiva per il caso multivariato:
expected_ndim=1 if config.input_size == 1 else 2,
),
# passo 3: gestisci i NaN riempiendo il target con zero
# e restituisci la maschera (che è nei valori osservati)
# vero per i valori osservati, falso per i NaN
# il decoder utilizza questa maschera (non si incorre in perdita per i valori non osservati)
# vedere loss_weights all'interno del modello xxxForPrediction
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
# passo 4: aggiungi caratteristiche temporali in base alla frequenza del dataset
# queste servono come codifiche posizionali
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(freq),
pred_length=config.prediction_length,
),
# passo 5: aggiungi un'altra caratteristica temporale (solo un numero singolo)
# dice al modello in quale punto della vita si trova il valore della serie temporale
# una sorta di contatore in esecuzione
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=config.prediction_length,
log_scale=True,
),
# passo 6: impila verticalmente tutte le caratteristiche temporali nella chiave FEAT_TIME
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if config.num_dynamic_real_features > 0
else []
),
),
# passo 7: rinomina per corrispondere ai nomi di HuggingFace
RenameFields(
mapping={
FieldName.FEAT_STATIC_CAT: "static_categorical_features",
FieldName.FEAT_STATIC_REAL: "static_real_features",
FieldName.FEAT_TIME: "time_features",
FieldName.TARGET: "values",
FieldName.OBSERVED_VALUES: "observed_mask",
}
),
]
)
Definisci InstanceSplitter
Per addestramento/validazione/test creiamo successivamente un InstanceSplitter
che viene utilizzato per campionare finestre dal dataset (poiché, ricorda, non possiamo passare all’intera storia dei valori al modello a causa dei vincoli di tempo e memoria).
Lo splitter di istanze campiona finestre casuali di dimensione context_length
e finestre successive di dimensione prediction_length
dai dati e aggiunge una chiave past_
o future_
a tutte le chiavi temporali per le rispettive finestre. In questo modo, i values
verranno suddivisi nelle chiavi past_values
e future_values
, che fungeranno rispettivamente come input dell’encoder e del decoder. La stessa cosa accade per tutte le chiavi nell’argomento time_series_fields
:
from gluonts.transform.sampler import InstanceSampler
from typing import Optional
def create_instance_splitter(
config: PretrainedConfig,
mode: str,
train_sampler: Optional[InstanceSampler] = None,
validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
assert mode in ["train", "validation", "test"]
instance_sampler = {
"train": train_sampler
or ExpectedNumInstanceSampler(
num_instances=1.0, min_future=config.prediction_length
),
"validation": validation_sampler
or ValidationSplitSampler(min_future=config.prediction_length),
"test": TestSplitSampler(),
}[mode]
return InstanceSplitter(
target_field="values",
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=instance_sampler,
past_length=config.context_length + max(config.lags_sequence),
future_length=config.prediction_length,
time_series_fields=["time_features", "observed_mask"],
)
Crea i DataLoaders
Successivamente, è il momento di creare i DataLoaders, che ci permettono di avere batch di coppie (input, output) – o in altre parole ( past_values
, future_values
).
from typing import Iterable
import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches
def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
"future_values",
"future_observed_mask",
]
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)
# inizializziamo un'istanza Training
instance_splitter = create_instance_splitter(config, "train")
# lo splitter di istanze campionerà una finestra di
# lunghezza context + lags + prediction length (tra tutte le possibili serie temporali trasformate, 1 nel nostro caso)
# casualmente all'interno della serie temporale target e restituirà un iteratore.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(
stream, is_train=True
)
return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
def create_test_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=False)
# creiamo uno splitter di istanze di Test che campionerà l'ultima
# finestra di contesto vista solo durante l'addestramento solo per l'encoder.
instance_sampler = create_instance_splitter(config, "test")
# applichiamo le trasformazioni in modalità di test
testing_instances = instance_sampler.apply(transformed_data, is_train=False)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
train_dataloader = create_train_dataloader(
config=config,
freq=freq,
data=multi_variate_train_dataset,
batch_size=256,
num_batches_per_epoch=100,
num_workers=2,
)
test_dataloader = create_test_dataloader(
config=config,
freq=freq,
data=multi_variate_test_dataset,
batch_size=32,
)
Controlliamo il primo batch:
batch = next(iter(train_dataloader))
for k, v in batch.items():
print(k, v.shape, v.type())
>>> past_time_features torch.Size([256, 264, 5]) torch.FloatTensor
past_values torch.Size([256, 264, 862]) torch.FloatTensor
past_observed_mask torch.Size([256, 264, 862]) torch.FloatTensor
future_time_features torch.Size([256, 48, 5]) torch.FloatTensor
future_values torch.Size([256, 48, 862]) torch.FloatTensor
future_observed_mask torch.Size([256, 48, 862]) torch.FloatTensor
Come si può vedere, non alimentiamo input_ids
e attention_mask
all’encoder (come avverrebbe per i modelli di NLP), ma piuttosto past_values
, insieme a past_observed_mask
, past_time_features
e static_real_features
.
Gli input del decoder consistono in future_values
, future_observed_mask
e future_time_features
. I future_values
possono essere considerati l’equivalente di decoder_input_ids
in NLP.
Per una spiegazione dettagliata di ognuno di essi, si faccia riferimento alla documentazione.
Passaggio in avanti
Eseguiamo un singolo passaggio in avanti con il batch appena creato:
# esegui passaggio in avanti
outputs = model(
past_values=batch["past_values"],
past_time_features=batch["past_time_features"],
past_observed_mask=batch["past_observed_mask"],
static_categorical_features=batch["static_categorical_features"]
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"]
if config.num_static_real_features > 0
else None,
future_values=batch["future_values"],
future_time_features=batch["future_time_features"],
future_observed_mask=batch["future_observed_mask"],
output_hidden_states=True,
)
print("Loss:", outputs.loss.item())
>>> Loss: -1071.5718994140625
Si noti che il modello restituisce una loss. Questo è possibile in quanto il decoder sposta automaticamente i future_values
di una posizione a destra per avere le etichette. Ciò consente di calcolare una loss tra i valori predetti e le etichette. La loss è il logaritmo negativo della verosimiglianza della distribuzione predetta rispetto ai valori veri e tende a meno infinito.
Si noti anche che il decoder utilizza una maschera causale per non guardare nel futuro poiché i valori che deve predire sono nel tensore future_values
.
Allenare il modello
È ora il momento di allenare il modello! Utilizzeremo un ciclo di allenamento standard di PyTorch.
In questo caso, utilizzeremo la libreria 🤗 Accelerate, che posiziona automaticamente il modello, l’ottimizzatore e il dataloader sul dispositivo appropriato device
.
from accelerate import Accelerator
from torch.optim import AdamW
epochs = 25
loss_history = []
accelerator = Accelerator()
device = accelerator.device
model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)
model, optimizer, train_dataloader = accelerator.prepare(
model,
optimizer,
train_dataloader,
)
model.train()
for epoch in range(epochs):
for idx, batch in enumerate(train_dataloader):
optimizer.zero_grad()
outputs = model(
static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
future_values=batch["future_values"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
future_observed_mask=batch["future_observed_mask"].to(device),
)
loss = outputs.loss
# Backpropagation
accelerator.backward(loss)
optimizer.step()
loss_history.append(loss.item())
if idx % 100 == 0:
print(loss.item())
>>> -1081.978515625
...
-2877.723876953125
# visualizza l'allenamento
loss_history = np.array(loss_history).reshape(-1)
x = range(loss_history.shape[0])
plt.figure(figsize=(10, 5))
plt.plot(x, loss_history, label="train")
plt.title("Loss", fontsize=15)
plt.legend(loc="upper right")
plt.xlabel("iterazione")
plt.ylabel("nll")
plt.show()
Inferenza
All’atto dell’inferenza, è consigliabile utilizzare il metodo generate()
per la generazione auto-regressiva, simile ai modelli di NLP.
La previsione comporta l’ottenimento di dati dal campione dell’istanza di test, che campionerà la finestra di dimensione context_length
più recente di valori da ogni serie temporale nel dataset, e la passerà al modello. Si noti che passiamo future_time_features
, che sono conosciuti in anticipo, al decoder.
Il modello campionerà in modo auto-regressivo un certo numero di valori dalla distribuzione prevista e li passerà nuovamente al decoder per restituire le previsioni:
model.eval()
forecasts_ = []
for batch in test_dataloader:
outputs = model.generate(
static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
)
forecasts_.append(outputs.sequences.cpu().numpy())
Il modello restituisce un tensore di forma (batch_size
, numero di campioni
, lunghezza della previsione
, dimensione dell'input
).
In questo caso, otteniamo 100
valori possibili per le prossime 48
ore per ciascuna delle 862
serie temporali (per ogni esempio nel batch che ha dimensione 1
poiché abbiamo solo una singola serie temporale multivariata):
forecasts_[0].shape
>>> (1, 100, 48, 862)
Li impileremo verticalmente per ottenere le previsioni per tutte le serie temporali presenti nel set di test (nel caso in cui ci siano più serie temporali nel set di test):
forecasts = np.vstack(forecasts_)
print(forecasts.shape)
>>> (1, 100, 48, 862)
Possiamo valutare la previsione risultante rispetto ai valori fuori campione del ground truth presenti nel set di test. Per fare ciò, utilizzeremo la libreria Evaluate di 🤗, che include le metriche MASE e sMAPE.
Calcoliamo entrambe le metriche per ciascuna variabile della serie temporale nel dataset:
from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")
forecast_median = np.median(forecasts, 1).squeeze(0).T
mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
training_data = ts["target"][:-prediction_length]
ground_truth = ts["target"][-prediction_length:]
mase = mase_metric.compute(
predictions=forecast_median[item_id],
references=np.array(ground_truth),
training=np.array(training_data),
periodicity=get_seasonality(freq),
)
mase_metrics.append(mase["mase"])
smape = smape_metric.compute(
predictions=forecast_median[item_id],
references=np.array(ground_truth),
)
smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")
>>> MASE: 1.1913437728068093
print(f"sMAPE: {np.mean(smape_metrics)}")
>>> sMAPE: 0.5322665081607634
plt.scatter(mase_metrics, smape_metrics, alpha=0.2)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()
Per tracciare la previsione per qualsiasi variabile della serie temporale rispetto ai dati di test del ground truth, definiamo il seguente aiuto:
import matplotlib.dates as mdates
def plot(ts_index, mv_index):
fig, ax = plt.subplots()
index = pd.period_range(
start=multi_variate_test_dataset[ts_index][FieldName.START],
periods=len(multi_variate_test_dataset[ts_index][FieldName.TARGET]),
freq=multi_variate_test_dataset[ts_index][FieldName.START].freq,
).to_timestamp()
ax.xaxis.set_minor_locator(mdates.HourLocator())
ax.plot(
index[-2 * prediction_length :],
multi_variate_test_dataset[ts_index]["target"][mv_index, -2 * prediction_length :],
label="effettivo",
)
ax.plot(
index[-prediction_length:],
forecasts[ts_index, ..., mv_index].mean(axis=0),
label="media",
)
ax.fill_between(
index[-prediction_length:],
forecasts[ts_index, ..., mv_index].mean(0)
- forecasts[ts_index, ..., mv_index].std(axis=0),
forecasts[ts_index, ..., mv_index].mean(0)
+ forecasts[ts_index, ..., mv_index].std(axis=0),
alpha=0.2,
interpolate=True,
label="+/- 1-deviazione standard",
)
ax.legend()
fig.autofmt_xdate()
Per esempio:
plot(0, 344)
Conclusione
Come ci confrontiamo con altri modelli? Il Monash Time Series Repository ha una tabella di confronto delle metriche MASE del set di test che possiamo aggiungere:
Come si può vedere, e forse sorprendentemente per alcuni, le previsioni multivariate sono tipicamente peggiori rispetto a quelle univariate, a causa della difficoltà nel calcolare le correlazioni/relazioni tra le serie. La varianza aggiuntiva introdotta dalle stime spesso danneggia le previsioni risultanti o il modello apprende correlazioni spurie. Per ulteriori informazioni, si rimanda a questo articolo. I modelli multivariati tendono a funzionare bene quando addestrati su molti dati.
Quindi il Transformer “vanilla” si comporta ancora meglio qui! In futuro, speriamo di poter valutare meglio questi modelli in un luogo centrale per facilitare la riproduzione dei risultati di diversi articoli. Restate sintonizzati per ulteriori aggiornamenti!
Risorse
Consigliamo di consultare la documentazione di Informer e il notebook di esempio collegato in cima a questo post.