Una guida completa del Distributed Data Parallel (DDP)

Una guida completa al Distributed Data Parallel (DDP)

Una guida completa su come velocizzare l’addestramento dei tuoi modelli con Distributed Data Parallel (DDP)

Immagine dell'autore

Introduzione

Ciao a tutti! Sono Francois, un ricercatore scientifico presso Meta. Benvenuti in questo nuovo tutorial che fa parte della serie Awesome AI Tutorials.

In questo tutorial scopriremo una tecnica ben nota chiamata DDP per addestrare modelli su più GPU contemporaneamente.

Durante i miei giorni all’università, ricordo di aver sfruttato le GPU di Google Colab per l’addestramento. Tuttavia, nel mondo aziendale la situazione è diversa. Se fai parte di un’azienda che investe molto nell’IA, in particolare se sei all’interno di una grande azienda tecnologica, probabilmente hai a disposizione una vasta gamma di cluster di GPU.

Questa sessione ha lo scopo di fornirti le conoscenze necessarie per sfruttare la potenza di più GPU, consentendo un addestramento rapido ed efficiente. E indovina un po’? È più semplice di quanto si pensi! Prima di procedere, ti consiglio di avere una buona comprensione di PyTorch, compresi i suoi componenti principali come Datasets, DataLoaders, Optimizers, CUDA e il ciclo di addestramento.

Inizialmente, consideravo DDP come uno strumento complesso e quasi irraggiungibile, pensando che richiedesse un grande team per configurare l’infrastruttura necessaria. Tuttavia, ti assicuro che DDP non solo è intuitivo, ma anche conciso, richiedendo solo poche righe di codice per implementarlo. Imbarichiamoci in questo viaggio illuminante insieme!

Un’intuizione di alto livello di DDP

Distributed Data Parallel (DDP) è un concetto semplice una volta che lo scomponiamo. Immagina di avere a disposizione un cluster con 4 GPU. Con DDP, lo stesso modello viene caricato su ciascuna GPU, inclusi gli ottimizzatori. La differenza principale sta nel modo in cui distribuiamo i dati.

DDP, Immagine tratta dal tutorial di PyTorch

Se sei familiare con il deep learning, ricorderai il DataLoader, uno strumento che suddivide il tuo dataset in batch distinti. La norma è suddividere l’intero dataset in questi batch, aggiornando il modello dopo ogni calcolo del batch.

Zoomando ulteriormente, DDP affina questo processo suddividendo ogni batch in quello che possiamo definire come “sotto-batch”. Fondamentalmente, ogni replica del modello elabora un segmento del batch principale, risultando in un calcolo del gradiente distinto per ogni GPU.

In DDP suddividiamo questo batch in sotto-batch tramite uno strumento chiamato DistributedSampler, come illustrato nel seguente disegno:

DDP, Immagine tratta dal tutorial di PyTorch

Dopo la distribuzione di ogni sotto-batch alle singole GPU, ogni GPU calcola il suo gradiente univoco.

DDP, Immagine tratta dal tutorial di PyTorch
  • Ora viene la magia di DDP. Prima di aggiornare i parametri del modello, i gradienti calcolati su ciascuna GPU devono essere aggregati in modo che ogni GPU abbia il gradiente medio calcolato sull’intero batch di dati.
  • Questo viene fatto prendendo i gradienti da tutte le GPU e facendo la media. Ad esempio, se hai 4 GPU, il gradiente medio per un particolare parametro del modello è dato dalla somma dei gradienti per quel parametro su ciascuna delle 4 GPU divisa per 4.
  • DDP utilizza il backend NCCL o Gloo (NCCL è ottimizzato per le GPU NVIDIA, Gloo è più generale) per comunicare ed effettuare la media dei gradienti tra le GPU in modo efficiente.
DDP, Immagine tratta dal tutorial PyTorch

Glossario sui termini, nodi e ranghi

Prima di immergerci nel codice, è fondamentale comprendere il vocabolario che useremo frequentemente. Sveliamo questi termini:

  • Nodo: Pensa a un nodo come a una potente macchina dotata di più GPU. Quando parliamo di un cluster, non si tratta semplicemente di un insieme di GPU messe insieme. Invece, sono organizzate in gruppi o “nodi”. Ad esempio, un nodo potrebbe ospitare 8 GPU.
  • Nodo Master: In un ambiente multi-nodo, di solito un nodo assume il comando. Questo “nodo master” gestisce compiti come la sincronizzazione, l’inizializzazione delle copie del modello, la supervisione del caricamento del modello e la gestione delle voci di log. Senza un nodo master, ogni GPU genererebbe i log in modo indipendente, portando al caos.
  • Rango Locale:Il termine “rango” può essere paragonato a un ID o a una posizione. Il rango locale si riferisce alla posizione o all’ID di una GPU all’interno del proprio nodo specifico (o macchina). È “locale” perché è limitato a quella particolare macchina.
  • Rango Globale:Guardando da una prospettiva più ampia, il rango globale identifica una GPU in tutti i nodi disponibili. È un identificatore univoco indipendentemente dalla macchina.
  • Dimensione Mondiale: In sostanza, si tratta di un conteggio di tutte le GPU disponibili per te in tutti i nodi. Semplicemente, è il prodotto del numero di nodi e del numero di GPU in ciascun nodo.

Per mettere le cose in prospettiva, se lavori solo con una macchina, le cose sono più semplici poiché il rango locale equivale al rango globale.

Per chiarire questo concetto con un’immagine:

Rango locale, immagine dal tutorial
Rango locale, immagine dal tutorial

Comprensione delle limitazioni di DDP:

Distributed Data Parallel (DDP) ha trasformato molti flussi di lavoro di deep learning, ma è essenziale comprendere i suoi limiti.

La limitazione principale di DDP sta nel suo consumo di memoria. Con DDP, ogni GPU carica una replica del modello, dell’ottimizzatore e del batch di dati corrispondente. Le memorie delle GPU generalmente variano da pochi GB a 80 GB per le GPU di alto livello.

Per modelli più piccoli, questo non è un problema. Tuttavia, quando ci si avventura nel campo dei Large Language Model (LLM) o architetture simili a GPT, i limiti di memoria di una singola GPU potrebbero essere insufficienti.

Nel campo della Computer Vision, mentre ci sono molti modelli leggeri, si presentano sfide quando si aumentano le dimensioni dei batch, specialmente in scenari che coinvolgono immagini 3D o compiti di rilevamento degli oggetti.

Entra in gioco il Fully Sharded Data Parallel (FSDP). Questo metodo estende i vantaggi di DDP non solo distribuendo i dati, ma anche distribuendo lo stato del modello e dell’ottimizzatore nelle memorie delle GPU. Anche se questo suona vantaggioso, FSDP aumenta la comunicazione tra le GPU, rallentando potenzialmente l’addestramento.

In sintesi:

  • Se il tuo modello e il batch corrispondente si adattano comodamente alla memoria di una GPU, DDP è la scelta migliore grazie alla sua velocità.
  • Per modelli di grandi dimensioni che richiedono più memoria, FSDP è la scelta più adatta. Tuttavia, tieni presente il suo compromesso: stai sacrificando la velocità per la memoria.

Perché dovresti preferire DDP rispetto a DP?

Se vai sul sito di PyTorch, ci sono effettivamente due opzioni: DP e DDP. Ma lo menziono solo perché non ti perdi o confondi: Usa semplicemente DDP, è più veloce e non è limitato a un singolo nodo.

Confronto da Pytorch tutorial

Percorso del codice:

L’implementazione del deep learning distribuito è più semplice di quanto si possa pensare. La bellezza risiede nel fatto che non sarai bloccato con configurazioni manuali della GPU o con le complessità della distribuzione del gradiente.

Troverai tutti i template e gli script su:

GitHub – FrancoisPorcher/awesome-ai-tutorials: La migliore collezione di tutorial di intelligenza artificiale per renderti un…

La migliore collezione di tutorial di intelligenza artificiale per renderti un boss della Scienza dei Dati! – GitHub …

github.com

Ecco una suddivisione dei passaggi che compiremo:

  1. Inizializzazione del processo: questo implica la designazione del nodo master, la specifica della porta e la configurazione del world_size.
  2. Impostazione del caricatore dati distribuiti: Cruciale in questa fase è la suddivisione di ogni batch sulle GPU disponibili. Ci assicureremo che i dati siano distribuiti in modo uniforme senza sovrapposizioni.
  3. Formazione/Test del modello: In sostanza, questa fase rimane in gran parte inalterata rispetto al processo con singola GPU.

Addestramento su 1 GPU 1 Nodo (baseline)

Per prima cosa, definiamo un codice di base che carica un dataset, crea un modello e lo allena su una singola GPU. Questo sarà il nostro punto di partenza:

import torchimport torch.nn.functional as Ffrom torch.utils.data import Dataset, DataLoaderfrom sklearn.datasets import load_winefrom sklearn.model_selection import train_test_splitfrom sklearn.preprocessing import StandardScalerimport numpy as npclass WineDataset(Dataset):    def __init__(self, data, targets):        self.data = data        self.targets = targets    def __len__(self):        return len(self.data)    def __getitem__(self, idx):        return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)class SimpleNN(torch.nn.Module):    def __init__(self):        super(SimpleNN, self).__init__()        self.fc1 = torch.nn.Linear(13, 64)        self.fc2 = torch.nn.Linear(64, 3)    def forward(self, x):        x = F.relu(self.fc1(x))        x = self.fc2(x)        return xclass Trainer():    def __init__(self, model, train_data, optimizer, gpu_id, save_every):        self.model = model        self.train_data = train_data        self.optimizer = optimizer        self.gpu_id = gpu_id        self.save_every = save_every        self.losses = []    def _run_batch(self, source, targets):        self.optimizer.zero_grad()        output = self.model(source)        loss = F.cross_entropy(output, targets)        loss.backward()        self.optimizer.step()        return loss.item()    def _run_epoch(self, epoch):        total_loss = 0.0        num_batches = len(self.train_data)        for source, targets in self.train_data:            source = source.to(self.gpu_id)            targets = targets.to(self.gpu_id)            loss = self._run_batch(source, targets)            total_loss += loss        avg_loss = total_loss / num_batches        self.losses.append(avg_loss)        print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")    def _save_checkpoint(self, epoch):        checkpoint = self.model.state_dict()        PATH = f"model_{epoch}.pt"        torch.save(checkpoint, PATH)        print(f"Epoch {epoch} | Model saved to {PATH}")    def train(self, max_epochs):        self.model.train()        for epoch in range(max_epochs):            self._run_epoch(epoch)            if epoch % self.save_every == 0:                self._save_checkpoint(epoch)def load_train_objs():    wine_data = load_wine()    X = wine_data.data    y = wine_data.target    # Normalizzazione e suddivisione    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)    scaler = StandardScaler().fit(X_train)    X_train = scaler.transform(X_train)    X_test = scaler.transform(X_test)    train_set = WineDataset(X_train, y_train)    test_set = WineDataset(X_test, y_test)    print("Esempio dal dataset:")    sample_data, sample_target = train_set[0]    print(f"Data: {sample_data}")    print(f"Target: {sample_target}")    model = SimpleNN()    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)    return train_set, model, optimizerdef prepare_dataloader(dataset, batch_size):    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)def main(device, total_epochs, save_every, batch_size):    dataset, model, optimizer = load_train_objs()    train_data = prepare_dataloader(dataset, batch_size)    trainer = Trainer(model, train_data, optimizer, device, save_every)    trainer.train(total_epochs)main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)

Allenamento su più GPU, 1 Nodo

Ora useremo tutte le GPU in un singolo nodo con i seguenti passaggi:

  1. Importare le librerie necessarie per l’allenamento distribuito.
  2. Inizializzare l’ambiente distribuito: (in particolare MASTER_ADDR e MASTER_PORT
  3. Avvolgere il modello con DDP utilizzando il wrapper DistributedDataParallel.
  4. Utilizzare il campionatore distribuito per assicurarsi che il dataset sia diviso tra le GPU in modo distribuito.
  5. Adattare la funzione principale per “spawnare” processi multi-GPU per l’allenamento.

Per le librerie, abbiamo bisogno di questo:

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

Quindi dobbiamo configurare ogni processo. Ad esempio, se abbiamo 8 GPU su 1 nodo, chiameremo le seguenti funzioni 8 volte, una per ogni GPU e con il giusto local_rank:

def ddp_setup(rank, world_size):
    """Configurare l'ambiente distribuito.
        Args:
        rank: Il rank del processo corrente. Identificatore univoco per ogni processo nell'allenamento distribuito.
        world_size: Numero totale di processi che partecipano all'allenamento distribuito.
    """
        # Indirizzo del nodo principale. Poiché stiamo facendo l'allenamento su un singolo nodo, è impostato su localhost.
    os.environ["MASTER_ADDR"] = "localhost"
        # Porta su cui il nodo principale si aspetta di ricevere le comunicazioni dai worker.
    os.environ["MASTER_PORT"] = "12355"
        # Inizializza il gruppo di processi. 
     # 'backend' specifica il backend di comunicazione da utilizzare, "nccl" è ottimizzato per l'allenamento su GPU.
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
        # Imposta il dispositivo CUDA corrente sul dispositivo specificato (identificato dal rank).
    # In questo modo si garantisce che ogni processo utilizzi una GPU diversa in una configurazione multi-GPU.
    torch.cuda.set_device(rank)

Alcune spiegazioni sulla funzione:

  • MASTER_ADDR è il nome del server su cui viene eseguito il processo principale (o il processo con rank 0). Qui è localhost
  • MASTER_PORT: Specifica la porta su cui il processo principale è in ascolto per le connessioni dai worker o da altri processi. 12355 è arbitrario. Puoi scegliere qualsiasi numero di porta non utilizzato da un altro servizio sul tuo sistema e consentito dalle regole del tuo firewall.
  • torch.cuda.set_device(rank): Ciò garantisce che ogni processo utilizzi la propria GPU corrispondente

Quindi dobbiamo modificare leggermente la classe Trainer. Semplicemente avvolgeremo il modello con la funzione DDP:

class Trainer():
    def __init__(auto, model, train_data, optimizer, gpu_id, save_every):
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.gpu_id = gpu_id
        self.save_every = save_every
        self.losses = [] 
                # Questo cambia
        self.model = DDP(self.model, device_ids=[gpu_id])

Il resto della classe Trainer è lo stesso, incredibile!

Ora dobbiamo modificare il dataloader, perché ricordiamo, dobbiamo suddividere il batch su ciascuna GPU:

def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )

Adesso possiamo modificare la funzione main, che verrà chiamata per ogni processo (quindi 8 volte nel nostro caso):

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    """Funzione principale di allenamento per la configurazione parallela di dati distribuiti (DDP).
        Args:
        rank (int): Il rank del processo corrente (0 <= rank < world_size). A ogni processo viene assegnato un rank univoco.
        world_size (int): Numero totale di processi coinvolti nell'allenamento distribuito.
        save_every (int): Frequenza di salvataggio del modello, in termini di epoche.
        total_epochs (int): Numero totale di epoche per l'allenamento.
        batch_size (int): Numero di campioni elaborati in un'iterazione (passaggio in avanti e all'indietro).
    """
        # Configura l'ambiente distribuito, inclusi l'indirizzo del master, la porta e il backend.
    ddp_setup(rank, world_size)
        # Carica gli oggetti di addestramento necessari: dataset, modello e ottimizzatore.
    dataset, model, optimizer = load_train_objs()
        # Prepara il caricatore di dati per l'allenamento distribuito. Suddivide il dataset tra i processi e gestisce la mescolatura.
    train_data = prepare_dataloader(dataset, batch_size)
        # Inizializza l'istanza del trainer con il modello, i dati e altre configurazioni caricate.
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
        # Allena il modello per il numero specificato di epoche.
    trainer.train(total_epochs)
        # Ripulisci l'ambiente distribuito dopo il completamento dell'allenamento.
    destroy_process_group()

E infine, durante l’esecuzione dello script, dovremo lanciare i 8 processi. Questo viene fatto con la funzione mp.spawn():

if __name__ == "__main__":    import argparse    parser = argparse.ArgumentParser(description='lavoro di allenamento distribuito semplice')    parser.add_argument('total_epochs', type=int, help='Epoca totale per allenare il modello')    parser.add_argument('save_every', type=int, help='Ogni quanto salvare uno snapshot')    parser.add_argument('--batch_size', default=32, type=int, help='Dimensione del batch di input su ogni dispositivo (default: 32)')    args = parser.parse_args()        world_size = torch.cuda.device_count()    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

Passo finale: Allenamento su più nodi

Se sei arrivato fino a qui, congratulazioni! Il passo finale consiste nel mettere in grado di reclutare tutte le GPU disponibili su nodi diversi. Ma se hai capito ciò che abbiamo fatto finora, questo è molto semplice.

La distinzione chiave quando si passa a più nodi è il passaggio da local_rank a global_rank. Questo è imperativo perché ogni processo richiede un identificatore univoco. Ad esempio, se stai lavorando con due nodi, ognuno con 8 GPU, entrambi i processi 0 e 9 avrebbero un local_rank di 0.

Il global_rank è dato dalla formula molto intuitiva:

global_rank = node_rank * world_size_per_node + local_rank

Quindi per prima cosa modifichiamo la funzione ddp_setup:

def ddp_setup(local_rank, world_size_per_node, node_rank):    os.environ["MASTER_ADDR"] = "MASTER_NODE_IP"  # <-- Sostituisci con l'IP del tuo nodo principale    os.environ["MASTER_PORT"] = "12355"      global_rank = node_rank * world_size_per_node + local_rank    init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node*torch.cuda.device_count())    torch.cuda.set_device(local_rank)

E dobbiamo adattare la funzione main che ora prende anche il parametro wold_size_per_node:

def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int):    ddp_setup(local_rank, world_size_per_node, node_rank)    # ... (il resto della funzione main)

E infine adattiamo la funzione mp.spawn() con anche world_size_per_node:

if __name__ == "__main__":    import argparse    parser = argparse.ArgumentParser(description='lavoro di allenamento distribuito semplice')    parser.add_argument('total_epochs', type=int, help='Epoca totale per allenare il modello')    parser.add_argument('save_every', type=int, help='Ogni quanto salvare uno snapshot')    parser.add_argument('--batch_size', default=32, type=int, help='Dimensione del batch di input su ogni dispositivo (default: 32)')    parser.add_argument('--node_rank', default=0, type=int, help='Il rango del nodo nell'allenamento multi-nodo')    args = parser.parse_args()    world_size_per_node = torch.cuda.device_count()    mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)

Utilizzo di un cluster (SLURM)

Adesso sei pronto per inviare l’allenamento al cluster. È molto semplice, basta chiamare il numero di nodi desiderato.

Ecco un modello di script SLURM:

#!/bin/bash#SBATCH --job-name=DDPTraining       # Nome del lavoro#SBATCH --nodes=$1                   # Numero dei nodi specificato dall'utente#SBATCH --ntasks-per-node=1          # Assicura che solo un task venga eseguito per nodo#SBATCH --cpus-per-task=1            # Numero di core CPU per task#SBATCH --gres=gpu:1                 # Numero di GPU per nodo#SBATCH --time=01:00:00              # Tempo limite ore:min:sec (1 ora in questo esempio)#SBATCH --mem=4GB                    # Limite di memoria per GPU#SBATCH --output=training_%j.log     # Nome del file di output ed errori (%j si espande all'ID del job)#SBATCH --partition=gpu              # Specifica la partizione o codaesegui python3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID

E ora puoi avviare l’addestramento dal terminale con il comando

sbatch train_net.sh 2  # per utilizzare 2 nodi

Congratulazioni, ce l’hai fatta!

Grazie per aver letto! Prima di andare:

Per altri tutorial fantastici, controlla la mia compilazione di tutorial sull’AI su Github

GitHub – FrancoisPorcher/awesome-ai-tutorials: La migliore collezione di tutorial sull’AI per renderti un…

La migliore collezione di tutorial sull’AI per renderti un esperto di Data Science! – GitHub…

github.com

Dovresti ricevere i miei articoli nella tua casella di posta. Iscriviti qui.

Se vuoi avere accesso ad articoli premium su VoAGI, ti basta una membership di $5 al mese. Se ti iscrivi con il mio link, mi supporti con una parte della tua quota senza costi aggiuntivi.

Se hai trovato questo articolo interessante e utile, ti preghiamo di seguirmi e lasciare un applauso per contenuti più approfonditi! Il tuo supporto mi aiuta a continuare a produrre contenuti che favoriscono la nostra comprensione collettiva.

Riferimenti