Addestramento con più GPU in PyTorch e Accumulazione del Gradiente come alternativa ad esso

Addestramento con più GPU e Accumulazione del Gradiente in PyTorch

Codice e Teoria

https://unsplash.com/photos/vBzJ0UFOA70

In questo articolo andremo prima a vedere le differenze tra gli algoritmi di Parallelismo dei Dati (DP) e Parallelismo Distribuito dei Dati (DDP), poi spiegheremo cosa è l’Accumulo del Gradiente (GA) per infine mostrare come DDP e GA vengono implementati in PyTorch e come portano allo stesso risultato.

Introduzione

Nell’addestramento di una rete neurale profonda (DNN), un iperparametro importante è la dimensione del batch. Normalmente, la dimensione del batch non dovrebbe essere troppo grande perché la rete tenderebbe a sovradattarsi, ma nemmeno troppo piccola perché ciò comporterebbe una convergenza lenta. Quando si lavora con immagini ad alta risoluzione o altri tipi di dati che occupano molta memoria, assumendo che oggi la maggior parte dell’addestramento dei grandi modelli DNN venga effettuata su GPU, l’adattamento di una dimensione di batch piccola può essere problematico a seconda della memoria della GPU disponibile. Poiché, come abbiamo detto, le dimensioni del batch ridotte comportano una convergenza lenta, ci sono tre metodi principali che possiamo utilizzare per aumentare la dimensione effettiva del batch:

  1. Utilizzare più piccole GPU che eseguono il modello in parallelo su mini-batch – algoritmi DP o DDP
  2. Utilizzare una GPU più grande (costoso)
  3. Accumulare il gradiente nel corso di più passaggi

Andiamo ora a esaminare 1. e 3. in maggiori dettagli – se si è fortunati ad avere una grande GPU su cui è possibile adattare tutti i dati necessari, è possibile leggere la parte DDP e vedere come è implementata in PyTorch nella sezione Codice Completo saltando il resto.

Diciamo che vogliamo una dimensione effettiva del batch di 30 ma possiamo adattare solo 10 punti dati (dimensione mini-batch) su ogni GPU. Abbiamo due scelte: Parallelismo dei Dati o Parallelismo Distribuito dei Dati:

Parallelismo dei Dati (DP)

Prima di tutto, definiamo la GPU principale. Quindi, eseguiamo i seguenti passaggi:

  1. Spostiamo 10 punti dati (mini-batch) e la replica del modello su altre 2 GPU dalla GPU principale
  2. Eseguiamo un passaggio in avanti su ogni GPU e passiamo alla GPU principale le uscite
  3. Calcoliamo la perdita totale sulla GPU principale e quindi inviamo la perdita a ciascuna GPU per calcolare i gradienti dei parametri
  4. Inviare i gradienti indietro (questi sono la media dei gradienti per tutti gli esempi di addestramento) alla GPU principale, sommarli per ottenere il gradiente medio per l’intero batch di 30
  5. Aggiornare i parametri sulla GPU principale e inviare questi aggiornamenti alle altre 2 GPU per la prossima iterazione

Ci sono alcuni problemi e inefficienze con questo processo:

  • I dati vengono passati dalla GPU principale prima di essere divisi tra le altre GPU. Inoltre, la GPU principale viene utilizzata più delle altre GPU in quanto il calcolo della perdita totale e gli aggiornamenti dei parametri avvengono sulla GPU principale
  • È necessario sincronizzare i modelli sulle altre GPU ad ogni iterazione, il che può rallentare l’addestramento

Parallelismo Distribuito dei Dati (DDP)

Parallelismo Distribuito dei Dati è stato introdotto per migliorare le inefficienze dell’algoritmo di Parallelismo dei Dati. Abbiamo ancora le stesse impostazioni di prima – 30 punti dati per ogni batch con 3 GPU. Le differenze sono le seguenti:

  1. Non ha la GPU principale
  2. Poiché non abbiamo più la GPU principale, carichiamo i dati su ogni GPU in modo non sovrapposto in parallelo direttamente dal disco/RAM – DistributedSampler svolge questo compito per noi. Sottostante utilizza il rango locale (ID GPU) per distribuire i dati tra le GPU – dato un batch di 30 dati, la prima GPU utilizzerà i dati [0, 3, 6, …, 27], la seconda GPU [1, 4, 7, …, 28] e la terza GPU [2, 5, 8, …, 29]
n_gpu = 3for i in range(n_gpu):  print(np.arange(30)[i:30:n_gpu])

3. Il passaggio in avanti, il calcolo della perdita e i passaggi all’indietro vengono eseguiti su ogni GPU in modo indipendente e i gradienti vengono ridotti in modo asincrono calcolando la media e quindi l’aggiornamento segue su tutte le GPU

A causa dei vantaggi del DDP rispetto al DP, l’uso del DDP è preferito al giorno d’oggi, pertanto mostreremo solo l’implementazione del DDP.

Accumulo del gradiente

Se abbiamo solo una GPU ma vogliamo comunque utilizzare una dimensione del batch più grande, un’opzione alternativa è accumulare i gradienti per un certo numero di passaggi, accumulando efficacemente i gradienti per un certo numero di mini-batch aumentando la dimensione del batch effettiva. Dall’esempio precedente, potremmo accumulare i gradienti di 10 punti dati per 3 iterazioni per ottenere gli stessi risultati di quanto descritto nell’addestramento DDP con una dimensione del batch effettiva di 30.

Codice del processo DDP

Di seguito esaminerò solo le differenze nell’implementazione del DDP rispetto al codice con 1 GPU. Il codice completo può essere trovato in alcune sezioni più avanti. Prima di tutto inizializziamo il gruppo di processi che consente la comunicazione tra i diversi processi. Con int(os.environ[“LOCAL_RANK”]) otteniamo la GPU utilizzata in un dato processo.

init_process_group(backend="nccl")device = int(os.environ["LOCAL_RANK"])torch.cuda.set_device(device)

Successivamente, è necessario avvolgere il modello in DistributedDataParallel che consente l’addestramento su più GPU.

model = NeuralNetwork(args.data_size) model = model.to(device)  if args.distributed:  model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])

L’ultima parte consiste nel definire il DistributedSampler di cui ho parlato nella sezione DDP.

sampler = torch.utils.data.DistributedSampler(dataset)

Il resto dell’addestramento rimane lo stesso – includerò il codice completo alla fine di questo articolo.

Codice di accumulo del gradiente

Quando avviene la retropropagazione, dopo aver chiamato loss.backward(), i gradienti vengono memorizzati nei rispettivi Tensori. L’aggiornamento effettivo avviene quando viene chiamato optimizer.step() e quindi i gradienti memorizzati nei Tensori vengono impostati a zero con optimizer.zero_grad() per eseguire la successiva iterazione di retropropagazione e aggiornamento dei parametri. Pertanto, per accumulare il gradiente chiamiamo loss.backward() per il numero di accumuli di gradienti di cui abbiamo bisogno senza impostare i gradienti a zero in modo che si accumulino attraverso più iterazioni e poi li media per ottenere il gradiente medio attraverso le iterazioni di gradienti accumulate (loss = loss/ACC_STEPS). Dopo di ciò chiamiamo optimizer.step() e azzeriamo i gradienti per iniziare la successiva accumulazione di gradienti. Nel codice:

ACC_STEPS = dist.get_world_size() # == numero di GPU# iterare attraverso i datifor i, (idxs, row) in enumerate(loader):  loss = model(row)    # scala la perdita in base ai passaggi di accumulo  loss = loss/ACC_STEPS  loss.backward()  # continua ad accumulare i gradienti per ACC_STEPS  if ((i + 1) % ACC_STEPS == 0):    optimizer.step()      optimizer.zero_grad()

Codice completo

import osos.environ["CUDA_VISIBLE_DEVICES"] = "0,1"print(os.environ["CUDA_VISIBLE_DEVICES"])import torchimport torch.nn as nnfrom torch.utils.data import DataLoader, Dataset, Samplerimport argparseimport torch.optim as optim import numpy as npimport randomimport torch.backends.cudnn as cudnnimport torch.nn.functional as Ffrom torch.distributed import init_process_groupimport torch.distributed as distclass data_set(Dataset):        def __init__(self, df):        self.df = df            def __len__(self):        return len(self.df)        def __getitem__(self, index):                    sample = self.df[index]        return index, sample    class NeuralNetwork(nn.Module):    def __init__(self, dsize):        super().__init__()        self.linear =  nn.Linear(dsize, 1, bias=False)        self.linear.weight.data[:] = 1.    def forward(self, x):        x = self.linear(x)        loss = x.sum()        return loss                class DummySampler(Sampler):    def __init__(self, data, batch_size, n_gpus=2):        self.num_samples = len(data)        self.b_size = batch_size        self.n_gpus = n_gpus    def __iter__(self):        ids = []        for i in range(0, self.num_samples, self.b_size * self.n_gpus):            ids.append(np.arange(self.num_samples)[i: i + self.b_size*self.n_gpus :self.n_gpus])            ids.append(np.arange(self.num_samples)[i+1: (i+1) + self.b_size*self.n_gpus :self.n_gpus])        return iter(np.concatenate(ids))    def __len__(self):        # print ('\tcalling Sampler:__len__')        return self.num_samples            def main(args=None):        d_size = args.data_size    if args.distributed:        init_process_group(backend="nccl")        device = int(os.environ["LOCAL_RANK"])        torch.cuda.set_device(device)    else:        device = "cuda:0"    # fissiamo il seed per riproducibilità    seed = args.seed                    torch.manual_seed(seed)    np.random.seed(seed)    random.seed(seed)    cudnn.benchmark = True        # generiamo i dati    data = torch.rand(d_size, d_size)        model = NeuralNetwork(args.data_size)        model = model.to(device)          if args.distributed:        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])            optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)    dataset = data_set(data)    if args.distributed:        sampler = torch.utils.data.DistributedSampler(dataset, shuffle=False)    else:        # definiamo `DummySampler` per una riproducibilità esatta con `DistributedSampler`        # che suddivide i dati come descritto nell'articolo.         sampler = DummySampler(dataset, args.batch_size)            loader = DataLoader(                dataset,                batch_size=args.batch_size,                num_workers=0,                pin_memory=True,                sampler=sampler,                shuffle=False,                collate_fn=None,            )                  if not args.distributed:        grads = []        # ACC_STEPS è lo stesso delle GPU poiché dobbiamo dividere la perdita per questo numero    # per ottenere lo stesso gradiente come da più GPU che sono     # mediati insieme    ACC_STEPS = args.acc_steps     optimizer.zero_grad()        for epoch in range(args.epochs):                if args.distributed:            loader.sampler.set_epoch(epoch)                    for i, (idxs, row) in enumerate(loader):            if args.distributed:                optimizer.zero_grad()                        row = row.to(device, non_blocking=True)                         if args.distributed:                rank = dist.get_rank() == 0            else:                rank = True                        loss = model(row)                          if args.distributed:                # esegue automaticamente la media dei gradienti grazie all'incapsulamento del modello in                 # `DistributedDataParallel`                loss.backward()            else:                # scala la perdita in base ai passaggi di accumulo                loss = loss/ACC_STEPS                loss.backward()                        if i == 0 and rank:                print(f"Epoch {epoch} {100 * '='}")            if not args.distributed:                if (i + 1) % ACC_STEPS == 0: # esegui solo l'aggiornamento quando abbiamo fatto ACC_STEPS                    # accumula i gradienti per l'intero epoch                    optimizer.step()                      optimizer.zero_grad()            else:                optimizer.step()                         if not args.distributed and args.verbose:            print(100 * "=")            print("Pesi del modello: ", model.linear.weight)            print(100 * "=")        elif args.distributed and args.verbose and rank:            print(100 * "=")            print("Pesi del modello: ", model.module.linear.weight)            print(100 * "=")    if __name__ == "__main__":            parser = argparse.ArgumentParser()    parser.add_argument('--distributed', action='store_true',)    parser.add_argument('--seed', default=0, type=int)     parser.add_argument('--epochs', default=2, type=int)     parser.add_argument('--batch_size', default=4, type=int)     parser.add_argument('--data_size', default=16, type=int)     parser.add_argument('--acc_steps', default=3, type=int)     parser.add_argument('--verbose', action='store_true',)        args = parser.parse_args()        print(args)    main(args)

Ora, se eseguiamo questi due script:

  • python3 ddp.py — epochs 2 — batch_size 4 — data_size 8 — verbose — acc_steps 2
  • torchrun — standalone — nproc_per_node=2 ddp.py — epochs 2 — distributed — batch_size 4 — data_size 8 — verbose

vedremo che otteniamo gli stessi parametri finali esatti del modello:

# Dai Gradienti AccumulatoriPesi del modello:  Parameter containing:tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]],       device='cuda:0', requires_grad=True)# Dai DDP:Pesi del modello:  Parameter containing:tensor([[0.9472, 0.9440, 0.9527, 0.9687, 0.9570, 0.9343, 0.9411, 0.9186]],       device='cuda:0', requires_grad=True)

Conclusioni

In questo articolo abbiamo brevemente introdotto e dato un’intuizione dietro gli algoritmi DP, DDP e l’accumulo dei gradienti e abbiamo mostrato come aumentare la dimensione del batch effettivo anche senza avere più GPU. Una cosa importante da notare è che anche se otteniamo gli stessi risultati finali, l’addestramento con più GPU è molto più veloce rispetto all’uso dell’accumulo dei gradienti, quindi se la velocità di addestramento è importante, allora l’utilizzo di più GPU è l’unico modo per accelerare l’addestramento.