Sviluppare il tuo primo agente di intelligenza artificiale Deep Q-Learning

Sviluppare il tuo primo agente di intelligenza artificiale con Deep Q-Learning

Immergiti nel mondo dell’intelligenza artificiale – Costruisci una palestra di apprendimento rinforzato profondo da zero.

Costruisci la tua palestra di apprendimento rinforzato profondo - Immagine dell'autore

Indice

Se hai già una comprensione dei concetti di Rinforzo e Apprendimento Q-Learning Profondo, sentiti libero di passare direttamente al tutorial passo-passo. Lì troverai tutte le risorse e il codice necessario per costruire una palestra di apprendimento rinforzato profondo da zero, compreso l’ambiente, l’agente e il protocollo di allenamento.

Introduzione

Perché l’apprendimento rinforzato?Cosa otterraiCos’è l’apprendimento rinforzato?Apprendimento Q-Learning Profondo

Tutorial passo-passo

1. Configurazione iniziale2. L’immagine generale3. L’ambiente: fondamenti iniziali4. Implementazione dell’agente: architettura neurale e politica5. Agisci sull’ambiente: conclusioni6. Impara dagli esperimenti: Replay di esperienze7. Definisci il processo di apprendimento dell’agente: adattamento del NN8. Esegui il ciclo di allenamento: mettendo tutto insieme9. Conclusione10. Bonus: Ottimizza la rappresentazione dello stato

Perché l’apprendimento rinforzato?

L’adozione diffusa di sistemi AI avanzati, come ChatGPT, Bard, Midjourney, Stable Diffusion e molti altri, ha suscitato interesse nel campo dell’intelligenza artificiale, dell’apprendimento automatico e delle reti neurali, che spesso rimane insoddisfatto a causa della natura tecnica dell’implementazione di tali sistemi.

Per coloro che desiderano iniziare il loro percorso nell’AI (o continuare quello iniziato), costruire una palestra di apprendimento rinforzato utilizzando l’apprendimento Q-Learning Profondo è un ottimo punto di partenza, poiché non richiede conoscenze avanzate per essere implementato, può essere facilmente ampliato per risolvere problemi complessi e può offrire immediata e tangibile comprensione di come l’intelligenza artificiale diventa “intelligente”.

Cosa otterrai

Premettendo che tu abbia una comprensione di base di Python, alla fine di questa introduzione all’apprendimento rinforzato profondo, senza l’utilizzo di framework di apprendimento rinforzato di alto livello, avrai sviluppato la tua palestra per allenare un agente a risolvere un problema semplice: spostarsi dal punto di partenza al traguardo!

Non è molto affascinante, ma acquisirai esperienza pratica su argomenti come la costruzione di un ambiente, la definizione di strutture di ricompensa e l’architettura neurale di base, l’aggiustamento dei parametri ambientali per osservare diversi comportamenti di apprendimento e trovare un equilibrio tra esplorazione e sfruttamento nella presa di decisioni.

Avrai quindi tutti gli strumenti di cui hai bisogno per implementare l’ambiente e i sistemi complessi che desideri e sarai ben preparato per approfondire argomenti come le reti neurali e le strategie avanzate di ottimizzazione nell’apprendimento per rinforzo.

Immagine dell'autore utilizzando l'ambiente LunarLander-v2 di Gymnasium

Acquisirai anche la fiducia e la comprensione necessarie per utilizzare efficacemente strumenti pre-costruiti come l’OpenAI Gym, poiché ogni componente del sistema viene implementato da zero e demistificato. Ciò ti consente di integrare senza soluzione di continuità queste potenti risorse nei tuoi progetti di intelligenza artificiale.

Cos’è l’apprendimento per rinforzo?

L’apprendimento per rinforzo (RL) è una sotto-area del machine learning (ML) focalizzata specificamente su come gli agenti (le entità che prendono decisioni) compiono azioni in un ambiente per completare un obiettivo.

Le sue implementazioni includono:

  • Giochi
  • Veicoli autonomi
  • Robotica
  • Finanza (trading algoritmico)
  • Elaborazione del linguaggio naturale
  • e molto altro..

L’idea dell’RL si basa sui principi fondamentali della psicologia comportamentale in cui un animale o una persona impara dalle conseguenze delle proprie azioni. Se un’azione porta a un buon risultato, allora l’agente viene premiato; se non lo fa, viene punito o non viene fornito alcun premio.

Prima di continuare, è importante comprendere alcuni termini comunemente utilizzati:

  • Ambiente: Questo è il mondo, il luogo in cui l’agente opera. Imposta le regole, i confini e le ricompense che l’agente deve navigare.
  • Agente: L’agente che prende le decisioni all’interno dell’ambiente. L’agente prende azioni in base alla sua comprensione dello stato in cui si trova.
  • Stato: Uno snapshot dettagliato della situazione attuale dell’agente nell’ambiente, comprese le metriche rilevanti o le informazioni sensoriali utilizzate per prendere decisioni.
  • Azione: La misura specifica che l’agente intraprende per interagire con l’ambiente, come muoversi, raccogliere un oggetto o avviare un’interazione.
  • Ricompensa: Il feedback dato dall’ambiente come risultato delle azioni dell’agente, che può essere positivo, negativo o neutro, guidando il processo di apprendimento.
  • Spazio stato/azione: La combinazione di tutti gli stati possibili che l’agente può incontrare e tutte le azioni che può compiere nell’ambiente. Questo definisce il campo delle decisioni e delle situazioni che l’agente deve imparare a navigare.

Fondamentalmente, in ogni passo (turno) del programma, l’agente riceve uno stato dall’ambiente, sceglie un’azione, riceve una ricompensa o una punizione e l’ambiente viene aggiornato o l’episodio è completo. Le informazioni ricevute dopo ogni passo vengono salvate come “esperienza” per il successivo addestramento.

Per un esempio più concreto, immagina di giocare a scacchi. La scacchiera è l’ambiente e tu sei l’agente. In ogni passo (o turno) visualizzi lo stato della scacchiera e scegli dallo spazio azioni, che è l’insieme di tutte le mosse possibili che potresti fare, e selezioni l’azione con il valore di ricompensa futura più alto possibile. Dopo che la mossa è stata effettuata, valuti se è stata una buona azione o meno e impari a fare meglio la prossima volta.

Potrebbe sembrare molta informazione all’inizio, ma mentre costruisci tutto ciò tu stesso, questi termini diventeranno molto naturali.

Deep Q-Learning

Il Q-Learning è un algoritmo utilizzato nell’ML in cui la ‘Q’ sta per “Qualità”, come il valore delle azioni che un agente può compiere. Funziona creando una tabella di valori Q, azioni e la qualità associata ad esse, che stimano la ricompensa futura attesa per l’esecuzione di un’azione in uno stato dato.

All’agente viene fornito lo stato dell’ambiente, controlla la tabella per vedere se lo ha già incontrato e quindi sceglie l’azione con il valore di ricompensa più alto.

Flusso sequenziale del Q-Learning: dalla valutazione dello stato all'aggiornamento della ricompensa e della tabella Q. - Immagine dell'autore

Tuttavia, il Q-Learning ha alcuni svantaggi. Ogni coppia di stato e azione deve essere esplorata per ottenere buoni risultati. Se gli spazi di stato e azione (l’insieme di tutti gli stati e azioni possibili) sono troppo grandi, non è possibile memorizzarli tutti in una tabella.

Qui entra in gioco il Deep Q-Learning (DQL), un’evoluzione del Q-Learning. DQL utilizza una rete neurale profonda (NN) per approssimare una funzione di valore Q anziché memorizzarli in una tabella. Ciò consente di gestire ambienti che hanno spazi di stato ad alta dimensionalità, come input di immagini da una telecamera, cosa che non sarebbe pratica per il Q-Learning tradizionale.

Deep Q-Learning è l'intersezione tra Q-Learning e Reti Neurali Profonde - Immagine dell'autore

La rete neurale può generalizzare su stati e azioni simili, scegliendo una mossa desiderabile anche se non è stata addestrata sulla situazione esatta, eliminando la necessità di una grande tabella.

Come la rete neurale fa questo va oltre lo scopo di questo tutorial. Fortunatamente, non è necessaria una comprensione approfondita per implementare efficacemente il Deep Q-Learning.

Costruzione del Reinforcement Learning Gym

1. Setup Iniziale

Prima di iniziare a codificare il nostro agente di intelligenza artificiale, è consigliabile avere una solida comprensione dei principi di Programmazione Orientata agli Oggetti (OOP) in Python.

Se non hai già installato Python, di seguito è riportato un semplice tutorial di Bhargav Bachina per iniziare. La versione che userò è la 3.11.6.

Come Installare e Iniziare con Python

Una guida per principianti e per chiunque voglia iniziare a imparare Python

VoAGI.com

L’unica dipendenza di cui avrai bisogno è TensorFlow, una libreria di apprendimento automatico open-source di Google che utilizzeremo per costruire e addestrare la nostra rete neurale. Puoi installarlo tramite pip nel terminale. La mia versione è la 2.14.0.

pip install tensorflow

O se quello non funziona:

pip3 install tensorflow

Avrai anche bisogno del pacchetto NumPy, ma questo dovrebbe essere incluso in TensorFlow. Se incontrassi dei problemi, pip install numpy.

È inoltre consigliato creare un nuovo file per ogni classe (ad esempio, environment.py). Questo eviterà che tu venga sopraffatto e faciliterà la risoluzione di eventuali errori che potresti incontrare.

Per il tuo riferimento, ecco il repository GitHub con il codice completo: https://github.com/HestonCV/rl-gym-from-scratch. Sentiti libero di clonarlo, esplorarlo e usarlo come punto di riferimento!

2. Il Quadro Generale

Per comprendere davvero i concetti anziché limitarsi a copiare il codice, è fondamentale capire le diverse parti che costruiremo e come si inseriscono insieme. In questo modo, ogni pezzo avrà un posto nell’immagine più grande.

Di seguito è riportato il codice per un ciclo di addestramento con 5000 episodi. Un episodio è essenzialmente un round completo di interazione tra l’agente e l’ambiente, dall’inizio alla fine.

Questo non dovrebbe essere implementato o compreso completamente in questo momento. Man mano che costruiamo ogni parte, se vuoi vedere come una specifica classe o metodo verrà utilizzato, fai riferimento a questo.

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    # agent.load(f'models/model_{grid_size}.h5')    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # Numero di episodi da eseguire prima che l'addestramento si interrompa    episodes = 5000    # Numero massimo di passi in ciascun episodio    max_steps = 200    for episode in range(episodes):        # Ottieni lo stato iniziale dell'ambiente e imposta done su False        state = environment.reset()        # Loop fino alla fine dell'episodio        for step in range(max_steps):            print('Episodio:', episode)            print('Passo:', step)            print('Epsilon:', agent.epsilon)            # Ottieni la scelta dell'azione dalla politica degli agenti            action = agent.get_action(state)            # Fai un passo nell'ambiente e salva l'esperienza            reward, next_state, done = environment.step(action)            experience_replay.add_experience(state, action, reward, next_state, done)            # Se l'esperienza di memorizzazione ha abbastanza memoria per fornire un campione, addestra l'agente            if experience_replay.can_provide_sample():                experiences = experience_replay.sample_batch()                agent.learn(experiences)            # Imposta lo stato a next_state            state = next_state                        if done:                break            # time.sleep(0.5)        agent.save(f'models/model_{grid_size}.h5')

Ogni iterazione interna viene considerata un passo.

Processo di formazione attraverso l'interazione Agente-Ambiente - Immagine dell'autore

In ogni passo:

  • Lo stato viene recuperato dall’ambiente.
  • L’agente sceglie un’azione in base a questo stato.
  • Si agisce sull’ambiente, restituendo la ricompensa, lo stato risultante dopo aver preso l’azione e se l’episodio è terminato.
  • Lo stato iniziale, l’azione, la ricompensa, lo stato successivo e il completamento vengono quindi salvati in experience_replay come una sorta di memoria a lungo termine (esperienza).
  • Successivamente, l’agente viene addestrato su un campione casuale di queste esperienze.

Alla fine di ogni episodio, o a intervalli regolari a tuo piacimento, i pesi del modello vengono salvati nella cartella dei modelli. Questi possono essere caricati in seguito per evitare di addestrare da zero ogni volta. L’ambiente viene quindi reimpostato all’inizio dell’episodio successivo.

Questa struttura di base è praticamente tutto ciò che serve per creare un agente intelligente in grado di risolvere una vasta gamma di problemi!

Come indicato nell’introduzione, il nostro problema per l’agente è abbastanza semplice: arrivare dalla sua posizione iniziale in una griglia alla posizione obiettivo designata.

3. L’Ambiente: Fondamenta Iniziali

Il punto più ovvio per iniziare lo sviluppo di questo sistema è l’ambiente.

Per avere una palestra RL funzionante, l’ambiente deve fare alcune cose:

  • Mantenere lo stato corrente del mondo.
  • Tenere traccia dell’obiettivo e dell’agente.
  • Consentire all’agente di apportare modifiche al mondo.
  • Restituire lo stato in una forma comprensibile dal modello.
  • Rappresentarlo in modo comprensibile per osservare l’agente.

Questo sarà il luogo in cui l’agente passerà tutta la sua vita. Definiremo l’ambiente come una semplice matrice quadrata/array 2D, o come una lista di liste in Python.

Questo ambiente avrà uno spazio di stato discreto, il che significa che i possibili stati che l’agente può incontrare sono distinti e contabili. Ogni stato è una condizione o uno scenario separato e specifico nell’ambiente, a differenza di uno spazio di stato continuo in cui gli stati possono variare in modo infinito e fluido – pensa agli scacchi rispetto al controllo di una macchina.

DQL è progettato specificamente per spazi di azione discreti (un numero finito di azioni) – è su questo che ci concentreremo. Altri metodi sono utilizzati per spazi di azione continui.

Nella griglia, lo spazio vuoto sarà rappresentato da 0, l’agente sarà rappresentato da 1 e l’obiettivo sarà rappresentato da -1. La dimensione dell’ambiente può essere quella che si desidera, ma man mano che l’ambiente diventa più grande, l’insieme di tutti gli stati possibili (spazio di stato) cresce in modo esponenziale. Questo può rallentare significativamente il tempo di addestramento.

La griglia avrà un aspetto simile a questo quando viene rappresentata:

[0, 1, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, -1, 0][0, 0, 0, 0, 0]

Costruzione della classe Ambiente e del metodo resetInizieremo implementando la classe Ambiente e un modo per inizializzare l’ambiente. Per ora, richiederà un numero intero, grid_size, ma espanderemo questa funzionalità a breve.

import numpy as npclass Ambiente:    def __init__(self, grid_size):        self.grid_size = grid_size        self.grid = []    def reset(self):        # Inizializza la griglia vuota come lista 2D di 0        self.grid = np.zeros((self.grid_size, self.grid_size))

Quando viene creata una nuova istanza, Ambiente salva grid_size e inizializza una griglia vuota.

Il metodo reset popola la griglia utilizzando np.zeros((self.grid_size, self.grid_size)), che prende una tupla, la forma, e restituisce un array NumPy 2D di quella forma composto solo da zeri.

Un array NumPy è una struttura dati a griglia che si comporta in modo simile a una lista in Python, tranne che ci consente di memorizzare e manipolare in modo efficiente dati numerici. Consente operazioni vettorializzate, il che significa che le operazioni vengono applicate automaticamente a tutti gli elementi dell’array senza bisogno di cicli espliciti.

Questo consente di effettuare calcoli su grandi set di dati molto più veloci ed efficienti rispetto alle liste Python standard. Non solo, ma è la struttura dati che ci si aspetta per l’architettura della rete neurale del nostro agente!

Perché il nome reset? Beh, questo metodo verrà chiamato per ripristinare l’ambiente e restituirà alla fine lo stato iniziale della griglia.

Aggiungere l’agente e l’obiettivoSuccessivamente, costruiremo i metodi per aggiungere l’agente e l’obiettivo alla griglia.

import randomdef add_agent(self):    # Scegliere una posizione casuale    location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # L'agente è rappresentato da un 1    self.grid[location[0]][location[1]] = 1        return locationdef add_goal(self):    # Scegliere una posizione casuale    location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))    # Ottenere una posizione casuale fino a quando non è occupata    while self.grid[location[0]][location[1]] == 1:        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # L'obiettivo è rappresentato da un -1    self.grid[location[0]][location[1]] = -1    return location

Le posizioni per l’agente e l’obiettivo saranno rappresentate da una tupla (x, y). Entrambi i metodi selezionano valori casuali all’interno dei limiti della griglia e restituiscono la posizione. La differenza principale è che add_goal si assicura di non selezionare una posizione già occupata dall’agente.

Posizioniamo l’agente e l’obiettivo in posizioni di partenza casuali per introdurre variabilità in ogni episodio, il che aiuta l’agente a imparare a navigare nell’ambiente da diversi punti di partenza, invece di memorizzare un percorso.

Infine, aggiungeremo un metodo per visualizzare il mondo nella console per consentirci di vedere le interazioni tra l’agente e l’ambiente.

def render(self):        # Convertire in una lista di interi per migliorare la formattazione        grid = self.grid.astype(int).tolist()        for row in grid:            print(row)        print('') # Per aggiungere un po' di spazio tra i render per ogni passaggio

render fa tre cose: converte gli elementi di self.grid nel tipo int, lo converte in una lista Python e stampa ogni riga.

L’unico motivo per cui non stampiamo ciascuna riga direttamente dalla matrice NumPy è semplicemente che non appare altrettanto ordinato.

Mettere tutto insieme..

import numpy as npimport randomclass Environment:    def __init__(self, grid_size):        self.grid_size = grid_size        self.grid = []    def reset(self):        # Inizializza la griglia vuota come una matrice 2D di 0s        self.grid = np.zeros((self.grid_size, self.grid_size))          def add_agent(self):        # Scegliere una posizione casuale        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # L'agente è rappresentato da un 1        self.grid[location[0]][location[1]] = 1                return location    def add_goal(self):        # Scegliere una posizione casuale        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))            # Ottenere una posizione casuale fino a quando non è occupata        while self.grid[location[0]][location[1]] == 1:            location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # L'obiettivo è rappresentato da un -1        self.grid[location[0]][location[1]] = -1            return location          def render(self):        # Convertire in una lista di interi per migliorare la formattazione        grid = self.grid.astype(int).tolist()        for row in grid:            print(row)        print('') # Per aggiungere un po' di spazio tra i render per ogni passo# Prova ambienteenv = Environment(5)env.reset()agent_location = env.add_agent()goal_location = env.add_goal()env.render()print(f'Posizione agente: {agent_location}')print(f'Posizione obiettivo: {goal_location}')

>>>[0, 0, 0, 0, 0][0, 0, -1, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 1, 0][0, 0, 0, 0, 0]Posizione agente: (3, 3)Posizione obiettivo: (1, 2)

Quando si guardano le posizioni, potrebbe sembrare che ci sia stato qualche errore, ma dovrebbero essere lette come (riga, colonna) dall’alto sinistra al basso destro. Inoltre, ricorda che le coordinate partono da zero.

Ora, l’ambiente è definito. Cosa viene dopo?

Approfondiamo reset. Modifichiamo il metodo reset per gestire il posizionamento dell’agente e dell’obiettivo per noi. Nel mentre, automatizziamo anche la visualizzazione.

class Ambiente:    def __init__(self, dimensione_griglia, render_on=False):        self.dimensione_griglia = dimensione_griglia        self.griglia = []        # Assicurati di aggiungere gli attributi nuovi        self.render_on = render_on        self.posizione_agente = None        self.posizione_obiettivo = None    def reset(self):        # Inizializza la griglia vuota come un array 2D di zeri        self.griglia = np.zeros((self.dimensione_griglia, self.dimensione_griglia))        # Aggiungi l'agente e l'obiettivo alla griglia        self.posizione_agente = self.aggiungi_agente()        self.posizione_obiettivo = self.aggiungi_obiettivo()        if self.render_on:            self.render()

Ora, quando viene chiamato reset, l’agente e l’obiettivo vengono aggiunti alla griglia, le loro posizioni iniziali vengono salvate e se render_on è impostato su true, verrà visualizzata la griglia.

...# Test dell'ambienteamb = Ambiente(5, render_on=True)amb.reset()# Ora, per accedere alle posizioni dell'agente e dell'obiettivo, puoi utilizzare gli attributi dell'Ambienteprint(f'Posizione Agente: {amb.posizione_agente}')print(f'Posizione Obiettivo: {amb.posizione_obiettivo}')

>>>[0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, -1][1, 0, 0, 0, 0]Posizione Agente: (4, 0)Posizione Obiettivo: (3, 4)

Definire lo stato dell’ambiente L’ultimo metodo che implementeremo per ora è get_state. A prima vista sembra che lo stato potrebbe essere semplicemente la griglia stessa, ma il problema di questo approccio è che non è quello che ci si aspetta dalla rete neurale.

Le reti neurali di solito richiedono un input unidimensionale, non la forma bidimensionale che attualmente viene rappresentata dalla griglia. Possiamo risolvere questo appiattendo la griglia utilizzando il metodo flatten integrato di NumPy. Questo posizionerà ogni riga nello stesso array.

def get_state(self):    # Appiattisce la griglia da 2D a 1D    stato = self.griglia.flatten()    return stato

Questo trasformerà:

[0, 0, 0, 0, 0][0, 0, 0, 1, 0][0, 0, 0, 0, 0][0, 0, 0, 0, -1][0, 0, 0, 0, 0]

In:

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

Come puoi vedere, non è immediatamente evidente quali celle siano quali, ma questo non sarà un problema per una rete neurale profonda.

Ora possiamo aggiornare reset per restituire lo stato subito dopo che griglia è popolata. Null’altro cambierà.

def reset(self):    ...    # Restituisce lo stato iniziale della griglia    return self.get_state()

Codice completo fino a questo punto..

import randomclass Ambiente:    def __init__(self, dimensione_griglia, render_on=False):        self.dimensione_griglia = dimensione_griglia        self.griglia = []        self.render_on = render_on        self.posizione_agente = None        self.posizione_obiettivo = None    def reset(self):        # Inizializza la griglia vuota come un array 2D di zeri        self.griglia = np.zeros((self.dimensione_griglia, self.dimensione_griglia))        # Aggiungi l'agente e l'obiettivo alla griglia        self.posizione_agente = self.aggiungi_agente()        self.posizione_obiettivo = self.aggiungi_obiettivo()        if self.render_on:            self.render()        # Restituisce lo stato iniziale della griglia        return self.get_state()    def aggiungi_agente(self):        # Scegli una posizione casuale        posizione = (random.randint(0, self.dimensione_griglia - 1), random.randint(0, self.dimensione_griglia - 1))        # L'agente è rappresentato da un 1        self.griglia[posizione[0]][posizione[1]] = 1        return posizione    def aggiungi_obiettivo(self):        # Scegli una posizione casuale        posizione = (random.randint(0, self.dimensione_griglia - 1), random.randint(0, self.dimensione_griglia - 1))        # Ottieni una posizione casuale fino a quando non è occupata        while self.griglia[posizione[0]][posizione[1]] == 1:            posizione = (random.randint(0, self.dimensione_griglia - 1), random.randint(0, self.dimensione_griglia - 1))                   # L'obiettivo è rappresentato da un -1        self.griglia[posizione[0]][posizione[1]] = -1        return posizione          def render(self):        # Converti in una lista di interi per migliorare la formattazione        griglia = self.griglia.astype(int).tolist()        for riga in griglia:            print(riga)        print('') # Per aggiungere spazio tra le visualizzazioni per ciascun passaggio          def get_state(self):        # Appiattisce la griglia da 2D a 1D        stato = self.griglia.flatten()        return stato

Hai ora implementato con successo le basi per l’ambiente! Tuttavia, se non l’hai notato, non possiamo ancora interagire con esso. L’agente è bloccato sul posto.

Torneremo su questo problema in seguito, dopo che la classe Agente sarà stata codificata per fornire un contesto migliore.

4. Implementa L’Architettura Neurale e La Policy Dell’Agente

Come detto in precedenza, l’agente è l’entità a cui viene dato lo stato del suo ambiente, in questo caso una versione appiattita della griglia del mondo, e prende una decisione su quale azione intraprendere nello spazio d’azione.

Solo per ribadire, lo spazio d’azione è l’insieme di tutte le possibili azioni, in questo scenario l’agente può muoversi in su, giù, a sinistra e a destra, quindi la dimensione dello spazio d’azione è 4.

Lo spazio dello stato è l’insieme di tutti gli stati possibili. Questo può essere un numero enorme a seconda dell’ambiente e del punto di vista dell’agente. Nel nostro caso, se il mondo è una griglia 5×5 ci sono 600 stati possibili, ma se il mondo è una griglia 25×25 ci sono 390.000, aumentando notevolmente il tempo di addestramento.

Perché un agente apprenda in modo efficace a completare un obiettivo, ha bisogno di alcune cose:

  • Rete neurale per approssimare i Q-values (valori Q stimati, ovvero la quantità totale di ricompensa futura per un’azione) nel caso di DQL.
  • Policy o una strategia che l’agente segue per scegliere un’azione.
  • Segnali di ricompensa dall’ambiente per dire all’agente quanto bene sta facendo.
  • Capacità di addestrarsi sulle esperienze passate.

Esistono due politiche differenti che si possono implementare:

  • Politica avida: Scegli l’azione con il Q-value più alto nello stato attuale.
  • Politica Epsilon-Greedy: Scegli l’azione con il Q-value più alto nello stato attuale, ma c’è una piccola possibilità, epsilon (comunemente indicato come ϵ), di scegliere un’azione casuale. Se epsilon = 0,02, allora c’è una probabilità del 2% che l’azione sarà casuale.

Ciò che implementeremo è la Politica Epsilon-Greedy.

Perché le azioni casuali aiutano l’apprendimento dell’agente? Esplorazione.

Quando l’agente inizia, potrebbe imparare un percorso subottimale verso l’obiettivo e continuare a fare questa scelta senza cambiarla o imparare una nuova strada.

Iniziando con un valore epsilon grande e diminuendolo lentamente, l’agente può esplorare completamente l’ambiente mentre aggiorna i suoi Q-values prima di sfruttare le strategie apprese. La quantità in cui diminuiamo epsilon nel tempo si chiama decadimento epsilon, che avrà più senso presto.

Come abbiamo fatto con l’ambiente, rappresenteremo l’agente con una classe.

Ora, prima di implementare la policy, dobbiamo trovare un modo per ottenere i Q-values. Qui entra in gioco il “cervello” dell’agente, ovvero la rete neurale.

La rete neurale Senza allontanarci troppo dal tema, una rete neurale è semplicemente una funzione massiccia. I valori entrano, passano attraverso ogni livello e vengono trasformati, quindi alla fine escono dei valori diversi. Nulla di più semplice di questo. La magia avviene quando inizia l’addestramento.

L’idea è quella di fornire alla NN grandi quantità di dati etichettati come, “questo è un input, e questo è ciò che dovresti produrre in output”. A ogni passo di addestramento, la NN regola lentamente i valori tra i neuroni, cercando di avvicinarsi il più possibile agli output dati, trovando schemi all’interno dei dati e sperando di aiutarci a fare previsioni per input che la rete non ha mai visto.

Trasformazione dello stato in Q-Values attraverso una rete neurale - Immagine dell'autore

La classe Agente e la definizione dell’architettura neurale Per ora definiremo l’architettura neurale usando TensorFlow e ci concentreremo sul “passaggio in avanti” dei dati.

from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequentialclass Agente:    def __init__(self, dimensione_griglia):        self.dimensione_griglia = dimensione_griglia        self.modello = self.costruisci_modello()    def costruisci_modello(self):        # Crea un modello sequenziale con 3 strati        modello = Sequential([            # Lo strato di input si aspetta una griglia appiattita, quindi la forma di input è la dimensione griglia al quadrato            Dense(128, activation='relu', input_shape=(self.dimensione_griglia**2,)),            Dense(64, activation='relu'),            # Lo strato di output con 4 unità per le azioni possibili (su, giù, sinistra, destra)            Dense(4, activation='linear')        ])        modello.compile(optimizer='adam', loss='mse')        return modello

Di nuovo, se non sei familiare con le reti neurali, non concentrarti troppo su questa sezione. Nonostante usiamo attivazioni come ‘relu’ e ‘linear’ nel nostro modello, un’approfondita esplorazione delle funzioni di attivazione è al di là dello scopo di questo articolo.

Tutto quello che devi sapere è che il modello prende come input lo stato, i valori vengono trasformati ad ogni livello del modello e vengono prodotti i quattro Q-valori corrispondenti ad ogni azione.

Nella costruzione della rete neurale dell’agente, iniziamo con uno strato di input che elabora lo stato della griglia, rappresentato come un array unidimensionale di dimensione grid_size². Questo perché abbiamo appiattito la griglia per semplificare l’input. Questo strato è l’input stesso e non ha bisogno di essere definito nella nostra architettura perché non ha input.

Successivamente, abbiamo due strati hidden. Questi sono valori che non vediamo, ma mentre il nostro modello apprende sono importanti per ottenere un’approssimazione più accurata della funzione Q-valore:

  1. Il primo strato hidden ha 128 neuroni, Dense(128, activation='relu'), e prende come input la griglia appiattita.
  2. Il secondo strato hidden consiste di 64 neuroni, Dense(64, activation='relu'), e continua a elaborare le informazioni.

Infine, lo strato di output, Dense(4, activation='linear'), consta di 4 neuroni, corrispondenti alle quattro azioni possibili (su, giù, sinistra, destra). Questo strato produce i Q-valori, stime per il premio futuro di ogni azione.

Tipicamente, più complessi sono i problemi che devi risolvere, più strati nascosti e neuroni saranno necessari. Due strati nascosti dovrebbero essere sufficienti per il nostro semplice caso d’uso.

I neuroni e gli strati possono e devono essere esplorati per trovare un equilibrio tra velocità e risultati, ognuno contribuendo alla capacità della rete di catturare ed imparare dalle sfumature dei dati. Come lo spazio dello stato, maggiore è la rete neurale, più lenta sarà l’addestramento.

Politica GreedyUsando questa rete neurale, siamo ora in grado di ottenere una previsione del valore Q, anche se non ancora molto precisa, e prendere una decisione.

import numpy as np   def get_action(self, state):    # Aggiungi una dimensione extra allo stato per creare un batch con una sola istanza    state = np.expand_dims(state, axis=0)        # Usa il modello per prevedere i Q-valori (valori delle azioni) per lo stato dato    q_values = self.model.predict(state, verbose=0)        # Seleziona e restituisci l'azione con il valore Q più alto    action = np.argmax(q_values[0]) # Prendi l'azione dalla prima (e unica) entry        return action

L’architettura delle reti neurali di TensorFlow richiede che l’input, lo stato, sia in batch. Questo è molto utile quando hai un grande numero di input e vuoi ottenere un batch completo di output, ma può essere un po’ confuso quando devi prevedere solo un input.

state = np.expand_dims(state, axis=0)

Possiamo risolvere questo utilizzando il metodo expand_dims di NumPy, specificando axis=0. Ciò crea semplicemente un batch di un unico input. Ad esempio, lo stato di una griglia di dimensione 5×5:

[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

Diventa:

[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]]

Nell’addestramento del modello, tipicamente userai batch di dimensione 32 o più. Assomiglierà a qualcosa del genere:

[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0], ... [0, 0, 0, 0, 

Ora che abbiamo preparato l'input per il modello nel formato corretto, possiamo prevedere i valori Q per ogni azione e scegliere quello più alto.

...# Usa il modello per prevedere i valori Q (valori azione) per lo stato datoq_values = self.model.predict(state, verbose=0)# Seleziona e restituisci l'azione con il valore Q più altoaction = np.argmax(q_values[0]) # Prendi l'azione dalla prima (e unica) voce...

Semplicemente diamo al modello lo stato e restituisce un lotto di previsioni. Ricorda, perché stiamo alimentando la rete un lotto di una, restituirà un lotto di una. Inoltre, verbose=0 assicura che la console rimanga libera dai messaggi di debug di routine ogni volta che viene chiamata la funzione di previsione.

Infine, scegliamo e restituiamo l'indice dell'azione con il valore più alto usando np.argmax sulla prima e unica voce nel lotto.

Nel nostro caso, gli indici 0, 1, 2 e 3 verranno mappati rispettivamente su su, giù, sinistra e destra.

La politica avida seleziona sempre l'azione che ha la ricompensa più alta secondo i valori Q correnti, il che potrebbe non sempre portare ai migliori risultati a lungo termine.

Politica Epsilon-AvidAbbiamo implementato la politica avida, ma quello che vogliamo avere è la politica Epsilon-Avid. Questo introduce casualità nella scelta dell'agente per consentire l'esplorazione dello spazio degli stati.

Giusto per ripassare, epsilon è la probabilità che venga scelta un'azione casuale. Vogliamo anche un modo per diminuire questo nel tempo mentre l'agente impara, consentendo lo sfruttamento della sua politica appresa. Come accennato in precedenza, questo si chiama decadimento epsilon.

Il valore di decadimento epsilon dovrebbe essere impostato su un numero decimale inferiore a 1, che viene utilizzato per ridurre progressivamente il valore epsilon dopo ogni passo che l'agente compie.

Tipicamente epsilon partirà da 1 e il decadimento epsilon sarà un valore molto vicino a 1, come 0,998. Dopo ogni passo nel processo di addestramento, moltiplichi epsilon per il decadimento epsilon.

Per illustrare questo, di seguito viene mostrato come epsilon cambierà nel processo di addestramento.

Inizializzazione Valori:epsilon = 1epsilon_decay = 0,998-----------------Passo 1:epsilon = 1epsilon = 1 * 0,998 = 0,998-----------------Passo 2:epsilon = 0,998epsilon = 0,998 * 0,998 = 0,996-----------------Passo 3:epsilon = 0,996epsilon = 0,996 * 0,998 = 0,994-----------------Passo 4:epsilon = 0,994epsilon = 0,994 * 0,998 = 0,992-----------------...-----------------Passo 1000:epsilon = 1 * (0,998)^1000 = 0,135-----------------...e così via

Come puoi vedere, epsilon si avvicina lentamente a zero con ogni passo. Al passo 1000, c'è una possibilità del 13,5% che venga scelta un'azione casuale. Il decadimento epsilon è un valore che deve essere regolato in base allo spazio degli stati. Con uno spazio degli stati ampio, potrebbe essere necessaria una maggiore esplorazione o un decadimento epsilon più alto.

Decadimento di epsilon nel tempo - Immagine dell'autore

Anche quando l'agente è addestrato bene, è vantaggioso mantenere un valore epsilon basso. Dovremmo definire un punto di arresto in cui epsilon non diminuisce ulteriormente, l'epsilon finale. Questo può essere 0,1, 0,01 o anche 0,001 a seconda dell'uso e della complessità del compito.

Nella figura sopra, noterai che epsilon smette di diminuire a 0,1, l'epsilon finale predefinito.

Aggiorniamo quindi la nostra classe Agente per incorporare epsilon.

import numpy as npclass Agente:    def __init__(self, dimensione_griglia, epsilon=1, epsilon_decay=0,998, epsilon_end=0,01):        self.dimensione_griglia = dimensione_griglia        self.epsilon = epsilon        self.epsilon_decay = epsilon_decay        self.epsilon_end = epsilon_end        ...    ...    def get_action(self, stato):        # rand() restituisce un valore casuale compreso tra 0 e 1        if np.random.rand() <= self.epsilon:            # Esplorazione: azione casuale            action = np.random.randint(0, 4)        else:            # Aggiungi una dimensione in più allo stato per creare un batch con un'istanza            stato = np.expand_dims(stato, axis=0)            # Usa il modello per prevedere i valori Q (valori azione) per lo stato dato            q_values = self.model.predict(stato, verbose=0)            # Seleziona e restituisci l'azione con il valore Q più alto            action = np.argmax(q_values[0]) # Prendi l'azione dalla prima (e unica) voce                # Decadimento del valore epsilon per ridurre l'esplorazione nel tempo        if self.epsilon > self.epsilon_end:            self.epsilon *= self.epsilon_decay        return action

Abbiamo dato ai valori predefiniti di epsilon, epsilon_decay e epsilon_end i valori 1, 0,998 e 0,01, rispettivamente.

Ricordate che epsilon e i valori correlati sono iperparametri, parametri utilizzati per controllare il processo di apprendimento. Possono e dovrebbero essere sperimentati per ottenere il miglior risultato.

Il metodo get_action è stato aggiornato per incorporare epsilon. Se il valore casuale dato da np.random.rand è minore o uguale a epsilon, viene scelta un'azione casuale. Altrimenti, il processo è lo stesso di prima.

Infine, se epsilon non ha raggiunto epsilon_end, lo aggiorniamo moltiplicandolo per epsilon_decay in questo modo: self.epsilon *= self.epsilon_decay.

Agente fino a questo punto:

from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequentialimport numpy as npclass Agente:    def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):        self.grid_size = grid_size        self.epsilon = epsilon        self.epsilon_decay = epsilon_decay        self.epsilon_end = epsilon_end        self.model = self.build_model()    def build_model(self):        # Crea un modello sequenziale con 3 livelli        model = Sequential([            # Il livello di input si aspetta una griglia appiattita, quindi la forma dell'input è la dimensione della griglia al quadrato            Dense(128, activation='relu', input_shape=(self.grid_size**2,)),            Dense(64, activation='relu'),            # Livello di output con 4 unità per le azioni possibili (su, giù, sinistra, destra)            Dense(4, activation='linear')        ])        model.compile(optimizer='adam', loss='mse')        return model    def get_action(self, state):        # rand() restituisce un valore casuale compreso tra 0 e 1        if np.random.rand() <= self.epsilon:            # Esplorazione: azione casuale            azione = np.random.randint(0, 4)        else:            # Aggiungi una dimensione extra allo stato per creare un batch con un'istanza            state = np.expand_dims(state, axis=0)            # Usa il modello per prevedere i valori Q (valori di azione) per lo stato dato            q_values = self.model.predict(state, verbose=0)            # Seleziona e restituisci l'azione con il valore Q più alto            azione = np.argmax(q_values[0]) # Prendi l'azione dal primo (e unico) elemento                # Decadimento del valore di epsilon per ridurre l'esplorazione nel tempo        if self.epsilon > self.epsilon_end:            self.epsilon *= self.epsilon_decay        return azione

Abbiamo implementato efficacemente la politica Epsilon-Greedy e siamo quasi pronti per consentire all'agente di imparare!

5. Incidere sull'ambiente: conclusione

Ambiente attualmente ha metodi per ripristinare la griglia, aggiungere l'agente e l'obiettivo, fornire lo stato corrente e stampare la griglia sulla console.

Perché l'ambiente sia completo, dobbiamo non solo consentire all'agente di influenzarlo, ma anche fornire un feedback sotto forma di ricompense.

Definire la struttura delle ricompenseCreare una buona struttura delle ricompense è la sfida principale dell'apprendimento per rinforzo. Il tuo problema potrebbe essere perfettamente all'interno delle capacità del modello, ma se la struttura delle ricompense non è configurata correttamente, potrebbe non imparare mai.

Lo scopo delle ricompense è quello di incoraggiare comportamenti specifici. Nel nostro caso vogliamo guidare l'agente verso la cella obiettivo, definita da -1.

Similmente ai livelli e ai neuroni nella rete, ed a epsilon e ai suoi valori correlati, esistono molti modi corretti (e molti sbagliati) per definire la struttura delle ricompense.

I due principali tipi di strutture delle ricompense:

  • Sparse: Quando le ricompense vengono assegnate solo in alcuni stati.
  • Dense: Quando le ricompense sono comuni in tutto lo spazio degli stati.

Con ricompense sparse, l'agente ha pochissimo feedback per guidarlo. Sarebbe come dare semplicemente una penalità fissa per ogni passo, e se l'agente raggiunge l'obiettivo fornisci una grande ricompensa.

L'agente può certamente imparare a raggiungere l'obiettivo, ma a seconda delle dimensioni dello spazio degli stati potrebbe richiedere molto tempo e potrebbe rimanere bloccato su una strategia subottimale.

Questo è in contrasto con le strutture di ricompensa dense, che consentono all'agente di addestrarsi più rapidamente e di comportarsi in modo più prevedibile.

Le strutture ricompensa dense

  • hanno più di un obiettivo.
  • danno suggerimenti durante un episodio.

L'agente ha quindi più opportunità per imparare il comportamento desiderato.

Ad esempio, immagina di allenare un agente a usare un corpo per camminare e la sola ricompensa che gli dai è quando raggiunge un obiettivo. L'agente potrebbe imparare a raggiungerlo semplicemente strisciando o rotolando per terra, o addirittura non imparare affatto.

Invece, se premi l'agente per dirigere verso l'obiettivo, mantenersi in piedi, mettere un piede davanti all'altro e stare eretto, otterrai una camminata molto più naturale e interessante, migliorando anche l'apprendimento.

Permettere all'agente di influenzare l'ambientePer avere delle ricompense, devi permettere all'agente di interagire con il suo mondo. Riprendiamo la classe Environment per definire questa interazione.

...def spostare_agente(self, azione):    # Mappa l'azione dell'agente al movimento corretto    movimenti = {        0: (-1, 0), # Su        1: (1, 0),  # Giù        2: (0, -1), # Sinistra        3: (0, 1)   # Destra    }        posizione_precedente = self.posizione_agente        # Determina la nuova posizione dopo aver applicato l'azione    movimento = movimenti[azione]    nuova_posizione = (posizione_precedente[0] + movimento[0], posizione_precedente[1] + movimento[1])        # Verifica se la mossa è valida    if self.posizione_valida(nuova_posizione):        # Rimuovi l'agente dalla vecchia posizione        self.griglia[posizione_precedente[0]][posizione_precedente[1]] = 0                # Aggiungi l'agente alla nuova posizione        self.griglia[nuova_posizione[0]][nuova_posizione[1]] = 1                 # Aggiorna la posizione dell'agente        self.posizione_agente = nuova_posizione            def posizione_valida(self, posizione):    # Verifica se la posizione è all'interno dei limiti della griglia    if (0 <= posizione[0] < self.dimensione_griglia) and (0 <= posizione[1] < self.dimensione_griglia):        return True    else:        return False

Il codice precedente definisce prima il cambiamento di coordinate associato a ogni valore di azione. Se l'azione 0 viene scelta, le coordinate cambiano di (-1, 0).

Ricorda, in questo scenario le coordinate vengono interpretate come (riga, colonna). Se la riga si abbassa di uno, l'agente si sposta di una cella verso l'alto, se la colonna si abbassa di uno, l'agente si sposta di una cella verso sinistra.

Viene quindi calcolata la nuova posizione in base al movimento. Se la nuova posizione è valida, viene aggiornata la posizione_agente. In caso contrario, la posizione_agente rimane la stessa.

Inoltre, posizione_valida controlla semplicemente se la nuova posizione è all'interno dei limiti della griglia.

Questo è abbastanza semplice, ma cosa ci manca? Il feedback!

Fornire un feedbackL'ambiente deve fornire una ricompensa appropriata e indicare se l'episodio è completo o no.

Incorporiamo prima il flag fatto per indicare che un episodio è finito.

...def spostare_agente(self, azione):    ...    fatto = False  # Di default l'episodio non è ancora finito              # Verifica se la mossa è valida    if self.posizione_valida(nuova_posizione):        # Rimuovi l'agente dalla vecchia posizione        self.griglia[posizione_precedente[0]][posizione_precedente[1]] = 0                # Aggiungi l'agente alla nuova posizione        self.griglia[nuova_posizione[0]][nuova_posizione[1]] = 1                 # Aggiorna la posizione dell'agente        self.posizione_agente = nuova_posizione                # Verifica se la nuova posizione è l'obiettivo        if self.posizione_agente == self.posizione_obiettivo:            # L'episodio è completo            fatto = True        return fatto...

Abbiamo impostato fatto su false di default. Se la nuova posizione_agente è uguale a posizione_obiettivo, allora fatto viene impostato su true. Infine, restituiamo questo valore.

Siamo pronti per la nostra struttura di ricompense. Prima, mostrerò l'implementazione per la struttura di ricompensa sparuta. Questa sarebbe sufficiente per una griglia di circa 5x5, ma la aggiorneremo per permettere un ambiente più grande.

Ricompense sparseL'implementazione di ricompense sparse è abbastanza semplice. Principalmente dobbiamo dare una ricompensa per atterrare sull'obiettivo.

Diamo anche una piccola ricompensa negativa per ogni passo che non atterra sull'obiettivo e una più grande per colpire il confine. Questo incoraggerà il nostro agente a dare la priorità al percorso più breve.

...def muovi_agente(self, azione):    ...    completato = False # L'episodio non è completato di default    ricompensa = 0   # Inizializza la ricompensa              # Verifica se la mossa è valida    if self.posizione_valida(nuova_posizione):        # Rimuovi l'agente dalla vecchia posizione        self.griglia[posizione_precedente[0]][posizione_precedente[1]] = 0                # Aggiungi l'agente alla nuova posizione        self.griglia[nuova_posizione[0]][nuova_posizione[1]] = 1                 # Aggiorna la posizione dell'agente        self.posizione_agente = nuova_posizione                # Verifica se la nuova posizione è la posizione del premio        if self.posizione_agente == self.posizione_premio:            # Ricompensa per aver raggiunto il premio            ricompensa = 100                          # L'episodio è completato            completato = True        else:            # Piccola penalità per una mossa valida che non ha raggiunto il premio            ricompensa = -1    else:        # Penalità leggermente maggiore per una mossa non valida        ricompensa = -3        return ricompensa, completato...

Assicurati di inizializzare ricompensa in modo che possa essere accessibile dopo i blocchi if. Verifica attentamente ogni caso: mossa valida e obiettivo raggiunto, mossa valida e obiettivo non raggiunto e mossa non valida.

Ricompense dense Mettere in pratica il nostro sistema di ricompense dense è ancora piuttosto semplice, comporta semplicemente fornire un feedback più frequentemente.

Come potremmo premiare l'agente per muoversi verso l'obiettivo in modo più graduale?

Il primo modo è restituire il negativo della distanza di Manhattan. La distanza di Manhattan è la distanza nella direzione delle righe, più la distanza nella direzione delle colonne, anziché considerare la distanza in linea d'aria. Ecco come appare il codice:

ricompensa = -(np.abs(self.posizione_premio[0] - nuova_posizione[0]) + \           np.abs(self.posizione_premio[1] - nuova_posizione[1]))

Quindi, il numero di passi nella direzione delle righe più il numero di passi nella direzione delle colonne, negato.

L'altro modo per farlo è fornire una ricompensa in base alla direzione in cui si muove l'agente: se si allontana dall'obiettivo, fornire una ricompensa negativa e se si avvicina fornire una ricompensa positiva.

Possiamo calcolare ciò sottraendo la nuova distanza di Manhattan dalla precedente distanza di Manhattan. Sarà o 1 o -1 perché l'agente può muoversi solo di una cella per volta.

Nel nostro caso avrebbe più senso scegliere la seconda opzione. Ciò dovrebbe fornire risultati migliori perché fornisce un feedback immediato in base a quel passo anziché una ricompensa più generale.

Il codice per questa opzione:

...def muovi_agente(self, azione):    ...        if self.posizione_agente == self.posizione_premio:            ...        else:            # Calcola la distanza prima della mossa            distanza_precedente = np.abs(self.posizione_premio[0] - posizione_precedente[0]) + \                                np.abs(self.posizione_premio[1] - posizione_precedente[1])                                # Calcola la distanza dopo la mossa            distanza_nuova = np.abs(self.posizione_premio[0] - nuova_posizione[0]) + \                           np.abs(self.posizione_premio[1] - nuova_posizione[1])                        # Se nuova_posizione è più vicina all'obiettivo, ricompensa = 1, se più lontana, ricompensa = -1            ricompensa = (distanza_precedente - distanza_nuova)    ...

Come puoi vedere, se l'agente non ha raggiunto l'obiettivo, calcoliamo distanza_precedente, distanza_nuova e definiamo ricompensa come la differenza tra queste due.

In base alle prestazioni potrebbe essere opportuno scalarla, o qualsiasi altra ricompensa nel sistema. Puoi farlo semplicemente moltiplicando per un numero (ad esempio, 0.01, 2, 100) se deve essere maggiore. Le loro proporzioni devono guidare efficacemente l'agente verso l'obiettivo. Ad esempio, una ricompensa di 1 per avvicinarsi all'obiettivo e una ricompensa di 0.1 per l'obiettivo stesso non avrebbe molto senso.

Le ricompense sono proporzionali. Se si scala ogni ricompensa positiva e negativa dello stesso fattore, non dovrebbe influire generalmente sulla formazione, a parte valori molto grandi o molto piccoli.

In sintesi, se l'agente è a 10 passi dall'obiettivo e si sposta in uno spazio distante 11 passi, allora ricompensa sarà -1.

Ecco il metodo aggiornato muovi_agente.

def muovi_agente(self, azione):    # Mappa l'azione dell'agente al movimento corretto    movimenti = {        0: (-1, 0), # Su        1: (1, 0),  # Giù        2: (0, -1), # Sinistra       3: (0, 1)   # Destra    }        posizione_precedente = self.posizione_agente        # Determina la nuova posizione dopo l'applicazione dell'azione    movimento = movimenti[azione]    nuova_posizione = (posizione_precedente[0] + movimento[0], posizione_precedente[1] + movimento[1])        completato = False # L'episodio non è completato di default    ricompensa = 0   # Inizializza la ricompensa              # Verifica se la mossa è valida    if self.posizione_valida(nuova_posizione):        # Rimuovi l'agente dalla vecchia posizione        self.griglia[posizione_precedente[0]][posizione_precedente[1]] = 0                # Aggiungi l'agente alla nuova posizione        self.griglia[nuova_posizione[0]][nuova_posizione[1]] = 1                 # Aggiorna la posizione dell'agente        self.posizione_agente = nuova_posizione                # Verifica se la nuova posizione è la posizione del premio        if self.posizione_agente == self.posizione_premio:            # Ricompensa per aver raggiunto il premio            ricompensa = 100                          # L'episodio è completato            completato = True        else:            # Calcola la distanza prima della mossa            distanza_precedente = np.abs(self.posizione_premio[0] - posizione_precedente[0]) + \                                np.abs(self.posizione_premio[1] - posizione_precedente[1])                        # Calcola la distanza dopo la mossa            distanza_nuova = np.abs(self.posizione_premio[0] - nuova_posizione[0]) + \                           np.abs(self.posizione_premio[1] - nuova_posizione[1])                        # Se nuova_posizione è più vicina all'obiettivo, ricompensa = 1, se più lontana, ricompensa = -1            ricompensa = (distanza_precedente - distanza_nuova)    else:        # Penalità leggermente maggiore per una mossa non valida        ricompensa = -3        return ricompensa, completato

La ricompensa per raggiungere l'obiettivo e tentare una mossa non valida dovrebbe rimanere la stessa con questa struttura.

Penalità di passoC'è solo una cosa che ci manca.

Attualmente l'agente non viene penalizzato per quanto tempo impiega per raggiungere l'obiettivo. La nostra struttura di ricompensa implementata ha molti loop netti neutrali. Potrebbe andare avanti e indietro tra due posizioni per sempre, accumulando nessuna penalità. Possiamo risolvere questo sottraendo un piccolo valore ad ogni passo, causando la penalità del muoversi via ad essere maggiore della ricompensa per avvicinarsi. Questa illustrazione dovrebbe renderla molto più chiara.

Percorsi di ricompensa con e senza penalità di passo - Immagine dell'autore

Immagina che l'agente stia partendo dal nodo più a sinistra e debba prendere una decisione. Senza una penalità di passo, potrebbe scegliere di andare avanti, poi tornare indietro quante volte vuole e la sua ricompensa totale sarebbe 1 prima di muoversi finalmente verso l'obiettivo.

Quindi matematicamente, fare un loop 1000 volte e poi muoversi verso l'obiettivo è altrettanto valido che muoversi direttamente lì.

Cerca di immaginare un loop in entrambi i casi e vedi come la penalità si accumula (o non si accumula).

Implementiamo questo.

...# Se new_location è più vicino all'obiettivo, reward = 0.9, se è più lontano, reward = -1.1reward = (previous_distance - new_distance) - 0.1...

Ecco fatto. L'agente ora dovrebbe essere incentivato a prendere il percorso più breve, prevenendo il comportamento di loop.

Ma qual è il punto?In questo momento potresti pensare che sia una perdita di tempo definire un sistema di ricompensa e addestrare un agente per un compito che potrebbe essere completato con algoritmi molto più semplici.

E avresti ragione.

La ragione per cui stiamo facendo ciò è imparare a pensare a guidare il tuo agente verso il suo obiettivo. In questo caso potrebbe sembrare banale, ma cosa succederebbe se l'ambiente dell'agente includesse oggetti da raccogliere, nemici da combattere, ostacoli da superare e altro?

O un robot nel mondo reale con decine di sensori e motori che devono essere coordinati in sequenza per navigare in ambienti complessi e variabili?

Progettare un sistema per fare queste cose utilizzando la programmazione tradizionale sarebbe abbastanza difficile e sicuramente non si comporterebbe in modo organico o generale come utilizzando RL e una buona struttura di ricompensa per incoraggiare un agente a imparare strategie ottimali.

Il reinforcement learning è più utile in applicazioni in cui definire la sequenza esatta di passaggi necessari per completare il compito è difficile o impossibile a causa della complessità e variabilità dell'ambiente. L'unica cosa di cui hai bisogno per far funzionare l'RL è essere in grado di definire quale è il comportamento utile e quale comportamento dovrebbe essere scoraggiato.

Il metodo Environment finale -step.Con tutti i componenti di Environment al loro posto, possiamo ora definire il cuore dell'interazione tra l'agente e l'ambiente.

Fortunatamente, è abbastanza semplice.

def step(self, action):    # Applica l'azione all'ambiente, registra le osservazioni    reward, done = self.move_agent(action)    next_state = self.get_state()      # Renderizza la griglia ad ogni passo    if self.render_on:        self.render()      return reward, next_state, done

step sposta prima l'agente nell'ambiente e registra reward e done. Quindi ottiene lo stato immediatamente successivo a questa interazione, next_state. Poi, se render_on è impostato su true, la griglia viene renderizzata.

Infine, step restituisce i valori registrati, reward, next_state e done.

Questi saranno essenziali per costruire le esperienze da cui l'agente imparerà.

Congratulazioni! Hai ufficialmente completato la costruzione dell'ambiente per la tua palestra DRL.

In basso è presente la classe Environment completata.

import randomimport numpy as npclass Environment:    def __init__(self, grid_size, render_on=False):        self.grid_size = grid_size        self.render_on = render_on        self.grid = []        self.agent_location = None        self.goal_location = None    def reset(self):        # Inizializza la griglia vuota come un array 2D di 0s        self.grid = np.zeros((self.grid_size, self.grid_size))        # Aggiungi l'agente e l'obiettivo alla griglia        self.agent_location = self.add_agent()        self.goal_location = self.add_goal()        # Renderizza la griglia iniziale        if self.render_on:            self.render()        # Restituisci lo stato iniziale        return self.get_state()    def add_agent(self):        # Scegli una posizione casuale        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # L'agente è rappresentato da un 1        self.grid[location[0]][location[1]] = 1        return location    def add_goal(self):        # Scegli una posizione casuale        location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))        # Ottieni una posizione casuale finché non è occupata        while self.grid[location[0]][location[1]] == 1:            location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))                # L'obiettivo è rappresentato da un -1        self.grid[location[0]][location[1]] = -1        return location    def move_agent(self, action):        # Mappa l'azione dell'agente al movimento corretto        moves = {            0: (-1, 0), # Su            1: (1, 0),  # Giù            2: (0, -1), # Sinistra            3: (0, 1)   # Destra        }                previous_location = self.agent_location                # Determina la nuova posizione dopo l'applicazione dell'azione        move = moves[action]        new_location = (previous_location[0] + move[0], previous_location[1] + move[1])                done = False  # L'episodio non è completo per impostazione predefinita        reward = 0   # Inizializza la ricompensa                # Verifica una mossa valida        if self.is_valid_location(new_location):            # Rimuovi l'agente dalla vecchia posizione            self.grid[previous_location[0]][previous_location[1]] = 0                        # Aggiungi l'agente alla nuova posizione            self.grid[new_location[0]][new_location[1]] = 1                        # Aggiorna la posizione dell'agente            self.agent_location = new_location                        # Controlla se la nuova posizione è l'obiettivo            if self.agent_location == self.goal_location:                # Ricompensa per raggiungere l'obiettivo                reward = 100                                # L'episodio è completo                done = True            else:                # Calcola la distanza prima della mossa                previous_distance = np.abs(self.goal_location[0] - previous_location[0]) + \                                   

Siamo arrivati a questo punto dopo molte esperienze. Potrebbe essere utile riprendere la grande immagine all'inizio e rivalutare come ogni parte interagisce usando le tue nuove conoscenze prima di proseguire.

6. Imparare dalle esperienze: Experience Replay

Il modello e la politica dell'agente, insieme alla struttura di ricompensa dell'ambiente e al meccanismo per compiere azioni, sono stati completati, ma abbiamo bisogno di un modo per ricordare il passato in modo che l'agente possa imparare da esso.

Questo può essere fatto salvando le esperienze.

Ogni esperienza consiste in alcune cose:

  • Stato: Lo stato prima che venga compiuta un'azione.
  • Azione: Quale azione è stata compiuta in questo stato.
  • Ricompensa: Feedback positivo o negativo che l'agente riceve dall'ambiente in base alla sua azione.
  • Stato successivo: Lo stato immediatamente successivo all'azione, che consente all'agente di agire non solo in base alle conseguenze dello stato attuale, ma a molti stati in anticipo.
  • Fatto: Indica la fine di un'esperienza, permettendo all'agente di sapere se il compito è stato completato o no. Può essere vero o falso in ogni passaggio.

Questi termini non dovrebbero essere nuovi per te, ma non fa mai male rivederli!

Ogni esperienza è associata a esattamente un passaggio dell'agente. Questo fornirà tutto il contesto necessario per addestrarlo.

La classe ExperienceReplayPer tenere traccia e fornire queste esperienze quando necessario, definiremo una classe finale, ExperienceReplay.

from collections import deque, namedtupleclass ExperienceReplay:    def __init__(self, capacity, batch_size):        # La memoria memorizza le esperienze in un deque, quindi se la capacità viene superata, rimuove        # l'elemento più vecchio in modo efficiente        self.memory = deque(maxlen=capacity)        # La dimensione del batch specifica il numero di esperienze che saranno campionate contemporaneamente        self.batch_size = batch_size        # Experience è un namedtuple che memorizza le informazioni rilevanti per l'addestramento        self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])

Questa classe riceverà capacity, un valore intero che definisce il numero massimo di esperienze che salveremo in un determinato momento, e batch_size, un valore intero che determina quante esperienze campioniamo contemporaneamente per l'addestramento.

Il raggruppamento delle esperienze Se ti ricordi, la rete neurale nella classe Agent prende gruppi di input. Sebbene abbiamo usato solo un gruppo di dimensione uno per fare previsioni, sarebbe incredibilmente inefficiente per l'addestramento. Di solito, i gruppi di dimensioni 32 o superiori sono più comuni.

Raggruppando l'input per l'addestramento si ottengono due cose:

  • Aumenta l'efficienza perché consente il processamento parallelo di più punti dati, riducendo l'overhead computazionale e facendo un uso migliore delle risorse della GPU o della CPU.
  • Aiuta il modello a imparare in modo più consistente, dato che impara da una varietà di esempi contemporaneamente, il che può migliorare la sua capacità di gestire dati nuovi e non visti.

Memoria La memory sarà un deque (abbreviazione di double-ended queue, coda a doppio accesso). Questo ci consente di aggiungere nuove esperienze all'inizio e, una volta raggiunta la lunghezza massima definita da capacity, il deque le rimuoverà senza dover spostare ciascun elemento come accade con una lista Python. Questo può migliorare notevolmente la velocità quando capacity è impostata a 10.000 o più.

Esperienza Ogni esperienza sarà definita come un namedtuple. Anche se molte altre strutture dati potrebbero funzionare, questa migliora la leggibilità poiché estraiamo ogni parte come necessario nell'addestramento.

add_experience e sample_batch implementazione Aggiungere una nuova esperienza e campionare un gruppo sono piuttosto semplici.

import randomdef add_experience(self, state, action, reward, next_state, done):    # Crea una nuova esperienza e memorizzala nella memoria    experience = self.Experience(state, action, reward, next_state, done)    self.memory.append(experience)def sample_batch(self):    # Il gruppo sarà un campione casuale di esperienze dalla memoria di dimensione batch_size    batch = random.sample(self.memory, self.batch_size)    return batch

Il metodo add_experience crea una namedtuple con ogni parte di un'esperienza, state, action, reward, next_state e done, e lo aggiunge a memory.

sample_batch è altrettanto semplice. Prende e restituisce un campione casuale da memory di dimensione batch_size.

Experience Replay che memorizza le esperienze per l'Agente da raggruppare e apprendere - Immagine dell'autore

L'ultimo metodo necessario - can_provide_sampleInfine, sarebbe utile essere in grado di verificare se memory contiene abbastanza esperienze per fornirci un campione completo prima di tentare di ottenere un batch per l'addestramento.

def can_provide_sample(self):    # Determina se la lunghezza di memory ha superato batch_size    return len(self.memory) >= self.batch_size

Classe ExperienceReplay completata...

import randomfrom collections import deque, namedtupleclass ExperienceReplay:    def __init__(self, capacity, batch_size):        # Memory memorizza le esperienze in un deque, quindi se la capacità viene superata        # rimuove l'elemento più vecchio in modo efficiente        self.memory = deque(maxlen=capacity)        # Il batch size specifica il numero di esperienze che vengono campionate contemporaneamente        self.batch_size = batch_size        # Experience è una namedtuple che memorizza le informazioni rilevanti per l'addestramento        self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])    def add_experience(self, state, action, reward, next_state, done):        # Crea una nuova esperienza e la memorizza in memory        experience = self.Experience(state, action, reward, next_state, done)        self.memory.append(experience)    def sample_batch(self):        # Il batch sarà un campione casuale di esperienze di dimensione batch_size        batch = random.sample(self.memory, self.batch_size)        return batch    def can_provide_sample(self):        # Determina se la lunghezza di memory ha superato batch_size        return len(self.memory) >= self.batch_size

Con il meccanismo per salvare ogni esperienza e campionare da esse in atto, possiamo tornare alla classe Agent per finalmente abilitare l'apprendimento.

7. Definire il processo di apprendimento dell'Agente: Ottimizzare la rete neurale

L'obiettivo, durante l'addestramento della rete neurale, è ottenere che i valori Q che produce rappresentino accuratamente la ricompensa futura che ogni scelta fornirà.

In sostanza, vogliamo che la rete impari a predire quanto valore ha ciascuna decisione, considerando non solo la ricompensa immediata, ma anche le ricompense che potrebbe comportare in futuro.

Incorporazione delle ricompense futurePer raggiungere questo obiettivo, incorporiamo i valori Q dello stato successivo nel processo di addestramento.

Quando l'agente compie un'azione e si sposta a un nuovo stato, guardiamo i valori Q in questo nuovo stato per aiutarci a valutare il valore dell'azione precedente. In altre parole, le ricompense future potenziali influenzano il valore percepito delle scelte attuali.

Il metodo learn

import numpy as npdef learn(self, experiences):    states = np.array([experience.state for experience in experiences])    actions = np.array([experience.action for experience in experiences])    rewards = np.array([experience.reward for experience in experiences])    next_states = np.array([experience.next_state for experience in experiences])    dones = np.array([experience.done for experience in experiences])    # Prevedi i valori Q (valori di azione) per il batch di stati dato    current_q_values = self.model.predict(states, verbose=0)    # Prevedi i valori Q per il batch di next_state    next_q_values = self.model.predict(next_states, verbose=0)    ...

Utilizzando il batch fornito, experiences, estraiamo ogni parte utilizzando la comprensione della lista e i valori della namedtuple che abbiamo definito in precedenza in ExperienceReplay. Quindi convertiamo ciascuno in un array NumPy per migliorare l'efficienza e allinearci a ciò che ci si aspetta dal modello, come spiegato in precedenza.

Finalmente, utilizziamo il modello per prevedere i valori Q dello stato corrente in cui è stata presa l'azione e dello stato immediatamente successivo.

Prima di continuare con il metodo learn, devo spiegare qualcosa chiamata fattore di sconto.

Scontare le ricompense future - il ruolo di gammaIntuitivamente, sappiamo che le ricompense immediate sono generalmente prioritizzate quando tutto il resto è uguale. (Ti piacerebbe ricevere il tuo stipendio oggi o la prossima settimana?)

Rappresentare questo concetto matematicamente può sembrare molto meno intuitivo. Quando consideriamo il futuro, non vogliamo che sia altrettanto importante (ponderato) come il presente. Di quanto dobbiamo scontare il futuro, ovvero ridurne l'effetto su ogni decisione, è definito da gamma (comunemente indicato con la lettera greca γ).

Gamma può essere regolato, con valori più alti che incoraggiano la pianificazione e valori più bassi che incoraggiano un comportamento a breve vista. Useremo un valore predefinito di 0,99.

Il fattore di sconto sarà quasi sempre compreso tra 0 e 1. Un fattore di sconto maggiore di 1, che mette in primo piano il futuro rispetto al presente, comporterebbe un comportamento instabile e ha poche o nessuna applicazione pratica.

Implementare gamma e definire i valori Q di destinazioneRicorda che nel contesto dell'addestramento di una rete neurale, il processo si basa su due elementi chiave: i dati di input che forniamo e gli output corrispondenti che vogliamo che la rete impari a prevedere.

Dovremo fornire alla rete alcuni valori Q di destinazione che vengono aggiornati in base alla ricompensa data dall'ambiente in questo stato e nell'azione specifica, oltre alla ricompensa prevista scontata (per gamma) dell'azione migliore nello stato successivo.

So che è molto da comprendere, ma sarà meglio spiegato attraverso l'implementazione e l'esempio.

import numpy as np...class Agente:    def __init__(self, dimensione_griglia, epsilon=1, decadimento_epsilon=0.995, epsilon_fine=0.01, gamma=0.99):        ...        self.gamma = gamma        ...    ...    def apprendi(self, esperienze):        ...        # Inizializziamo i valori Q di destinazione come i valori Q correnti        valori_q_di_destinazione = valori_q_correnti.copy()        # Iteriamo attraverso ogni esperienza nel batch        for i in range(len(esperienze)):            if fatto[i]:                # Se l'episodio è terminato, non esiste un successivo valore Q                # [i, azioni[i]] è l'equivalente numpy di [i][azioni[i]]                valori_q_di_destinazione[i, azioni[i]] = ricompense[i]            else:                # Il valore Q aggiornato è la ricompensa più il valore Q massimo scontato per lo stato successivo                # [i, azioni[i]] è l'equivalente numpy di [i][azioni[i]]                valori_q_di_destinazione[i, azioni[i]] = ricompense[i] + self.gamma * np.max(valori_q_successivi[i])        ...

Abbiamo definito l'attributo di classe, gamma, con un valore predefinito di 0,99.

Quindi, dopo aver ottenuto la previsione per stato e stato_successivo che abbiamo implementato sopra, inizializziamo valori_q_di_destinazione ai valori Q correnti. Questi verranno aggiornati nel ciclo seguente.

Aggiornamento valori_q_di_destinazioneIteriamo attraverso ogni esperienza nel batch con due casi per l'aggiornamento dei valori:

  • Se l'episodio è fatto, il valore_q_di_destinazione per quell'azione è semplicemente la ricompensa data perché non esiste un valore_q_successivo rilevante.
  • In caso contrario, l'episodio non è fatto e il valore_q_di_destinazione per quell'azione diventa la ricompensa data, più il valore Q scontato dell'azione successiva prevista in valori_q_successivi.

Aggiornamento se fatto è vero:

valori_q_di_destinazione[i, azioni[i]] = ricompense[i]

Aggiornamento se fatto è falso:

valori_q_di_destinazione[i, azioni[i]] = ricompense[i] + self.gamma * np.max(valori_q_successivi[i])

La sintassi qui, valori_q_di_destinazione[i, azioni[i]], può sembrare confusa ma in sostanza si tratta del valore Q dell'esperienza i-esima, per l'azione azioni[i].

      Esperienza nel batch   Ricompensa dall'ambiente                v                    vvalori_q_di_destinazione[i, azioni[i]] = ricompense[i]                       ^           Indice dell'azione scelta

Questa è l'equivalente di NumPy di [i][actions[i]] nelle liste Python. Ricorda che ogni azione è un indice (da 0 a 3).

Come viene aggiornato target_q_values Solo per illustrare questo in modo più chiaro, mostrerò come target_q_values si allinea più strettamente con le ricompense effettive fornite durante l'addestramento. Ricorda che stiamo lavorando con un batch. Questo sarà un batch di tre elementi con valori di esempio per semplicità.

Inoltre, assicurati di capire che le voci in experiences sono indipendenti. Ciò significa che non si tratta di una sequenza di passaggi, ma di un campione casuale da una raccolta di esperienze individuali.

Pretendi che i valori di actions, rewards, dones, current_q_values e next_q_values siano i seguenti.

gamma = 0.99 actions = [1, 2, 2]  # (giù, sinistra, sinistra)rewards = [1, -1, 100] # Ricompense fornite dall'ambiente per l'azione dones = [False, False, True] # Indica se l'episodio è completo current_q_values = [    [2, 5, -2, -3],  # In questo stato, l'azione 2 (indice 1) é stata finora la migliore    [1, 3, 4, -1],   # Qui, l'azione 3 (indice 2) è attualmente favorita    [-3, 2, 6, 1]    # L'azione 3 (indice 2) ha il valore di Q più alto in questo stato]next_q_values = [    [1, 4, -1, -2],  # Valori di Q futuri dopo aver eseguito ogni azione dal primo stato    [2, 2, 5, 0],    # Valori di Q futuri dal secondo stato    [-2, 3, 7, 2]    # Valori di Q futuri dal terzo stato]

Copiamo quindi current_q_values in target_q_values per essere aggiornati.

target_q_values = current_q_values

Quindi, per ogni esperienza nel batch, possiamo mostrare i valori associati.

Questo non è codice, ma semplicemente un esempio dei valori in ogni fase. Se ti perdi, assicurati di fare riferimento ai valori iniziali per vedere da dove vengono.

Voce 1

i = 0 # Questa è la prima voce nel batch (primo ciclo) # Prime voci dei valori associatialla i-esima voce delle azioni actions[i] = 1rewards[i] = 1dones[i] = Falsetarget_q_values[i] = [2, 5, -2, -3]next_q_values[i] = [1, 4, -1, -2]

Poiché dones[i] è falso per questa esperienza, dobbiamo considerare i next_q_values e applicare gamma (0.99).

target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])

Perché ottenere il valore più grande di next_q_values[i]? Perché quella sarebbe stata l'azione successiva scelta e vogliamo la ricompensa stimata (valore di Q).

Poi aggiorniamo il target_q_values della i-esima esperienza all'indice corrispondente a actions[i] con la ricompensa per questa coppia stato/azione più la ricompensa scontata per la prossima coppia stato/azione.

Ecco i valori target in questa esperienza dopo essere stati aggiornati.

# Updated target_q_values[i]target_q_values[i] = [2, 4.96, -2, -3]                ^          ^              i = 0    action[i] = 1

Come puoi vedere, per lo stato corrente, scegliere 1 (giù) è ora ancora più desiderabile perché il valore è più alto e questo comportamento è stato rafforzato.

Può essere utile calcolarli da soli per renderli davvero chiari.

Voce 2

i = 1 # Questa è la seconda voce nel batch # Seconde voci dei valori associatialla i-esima voce delle azioni actions[i] = 2rewards[i] = -1dones[i] = Falsetarget_q_values[i] = [1, 3, 4, -1]next_q_values[i] = [2, 2, 5, 0]

dones[i] è anche falso qui, quindi dobbiamo considerare i next_q_values.

target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])

Di nuovo, aggiornando i target_q_values dell'esperienza i-esima all'indice actions[i].

# target_q_values[i] aggiornato target_q_values[i] = [1, 3, 3.95, -1]                ^             ^              i = 1      actions[i] = 2

Scegliere 2 (sinistra) è ora meno desiderabile perché il valore Q è più basso e questo comportamento è scoraggiato.

Voce 3

Infine, l'ultima voce nel gruppo.

i = 2 # Questa è la terza e ultima voce nel gruppo# Seconda voce di valori associatiala[i] = 2ricompense[i] = 100dones[i] = Truetarget_q_values[i] = [-3, 2, 6, 1]next_q_values[i] = [-2, 3, 7, 2]

dones[i] per questa voce è vero, indicando che l'episodio è completo e non saranno effettuate ulteriori azioni. Ciò significa che non consideriamo i next_q_values nel nostro aggiornamento.

target_q_values[i, actions[i]] = rewards[i]

Notare che impostiamo semplicemente target_q_values[i, actions[i]] al valore di rewards[i], perché non saranno prese altre azioni, non c'è futuro da considerare.

# target_q_values[i] aggiornato target_q_values[i] = [-3, 2, 100, 1]                ^             ^              i = 2       actions[i] = 2

Scegliere 2 (sinistra) in questo e in stati simili sarà ora molto più desiderabile.

Questo è lo stato in cui l'obiettivo era alla sinistra dell'agente, quindi quando è stata scelta quell'azione è stata data la ricompensa completa.

Anche se può sembrare piuttosto confuso, l'idea è semplicemente quella di creare valori Q aggiornati che rappresentino accuratamente le ricompense date dall'ambiente per fornirle alla rete neurale. Questo è ciò che la NN dovrebbe approssimare.

Cerca di immaginarlo al contrario. Poiché la ricompensa per raggiungere l'obiettivo è sostanziale, creerà un effetto di propagazione attraverso gli stati che conducono a quello in cui l'agente raggiunge l'obiettivo. Questo è il potere di gamma nel considerare lo stato successivo e il suo ruolo nel triplice dei valori di ricompensa all'indietro attraverso lo spazio degli stati.

Effetto di propagazione delle ricompense attraverso lo spazio degli stati - Immagine dell'autore

Sopra è mostrata una versione semplificata dei valori Q e l'effetto del fattore di sconto, considerando solo la ricompensa per l'obiettivo, non le ricompense o le penalità incrementali.

Scegli una cella nella griglia e spostati alla cella adiacente di qualità più alta. Vedrai che fornisce sempre un percorso ottimale all'obiettivo.

Questo effetto non è immediato. Richiede che l'agente esplori lo spazio degli stati e delle azioni per imparare gradualmente e adattare la sua strategia, costruendo una comprensione di come diverse azioni portano a diverse ricompense nel tempo.

Se la struttura delle ricompense è stata attentamente progettata, questo guiderà lentamente il nostro agente verso l'assunzione di azioni più vantaggiose.

Adattamento della rete neuralePer il metodo learn, l'ultimo passaggio da fare è fornire alla rete neurale dell'agente gli stati e i loro target_q_values associati. TensorFlow gestirà poi l'aggiornamento dei pesi per prevedere più accuratamente questi valori sugli stati simili.

...def learn(self, esperienze):    stati = np.array([esperienza.stato for esperienza in esperienze])    azioni = np.array([esperienza.azione for esperienza in esperienze])    ricompense = np.array([esperienza.ricompensa for esperienza in esperienze])    stati_successivi = np.array([esperienza.stato_successivo for esperienza in esperienze])    dones = np.array([esperienza.conclusa for esperienza in esperienze])    # Prevedi i valori Q (valori delle azioni) per il gruppo di stati dato    valori_q_correnti = self.model.predict(stati, verbose=0)    # Prevedi i valori Q per il gruppo di stati_successivi    valori_q_successivi = self.model.predict(stati_successivi, verbose=0)    # Inizializza i target Q-values come i valori Q correnti    target_q_values = valori_q_correnti.copy()    # Loop attraverso ogni esperienza nel gruppo    for i in range(len(esperienze)):        if dones[i]:            # Se l'episodio è concluso, non ci sono valori Q successivi            target_q_values[i, azioni[i]] = ricompense[i]        else:            # Il valore Q aggiornato è la ricompensa più il massimo valore Q scontato per lo stato successivo            # [i, azioni[i]] è l'equivalente numpy di [i][azioni[i]]            target_q_values[i, azioni[i]] = ricompense[i] + self.gamma * np.max(valori_q_successivi[i])    # Allenare il modello    self.model.fit(stati, target_q_values, epochs=1, verbose=0)

L'unica parte nuova è self.model.fit(states, target_q_values, epochs=1, verbose=0). fit prende due argomenti principali: i dati di input e i valori target che desideriamo. In questo caso, il nostro input è un batch di states e i valori target sono i Q-values aggiornati per ogni stato.

epochs=1 imposta semplicemente il numero di volte che si desidera che la rete cerchi di adattarsi ai dati. Uno è sufficiente perché vogliamo che sia in grado di generalizzare bene, non di adattarsi a questo batch specifico. verbose=0 dice semplicemente a TensorFlow di non stampare messaggi di debug come le barre di progresso.

La classe Agent è ora dotata della capacità di imparare da esperienze, ma ha bisogno di altri due semplici metodi: save e load.

Salvataggio e caricamento dei modelli addestratiIl salvataggio e il caricamento del modello ci evita di doverlo addestrare completamente ogni volta che ne abbiamo bisogno. Possiamo utilizzare i semplici metodi di TensorFlow che richiedono solo un argomento, file_path.

from tensorflow.keras.models import load_modeldef load(self, file_path):    self.model = load_model(file_path)def save(self, file_path):    self.model.save(file_path)

Crea una directory chiamata modelli, o come preferisci, e quindi puoi salvare il tuo modello addestrato a intervalli regolari. Questi file terminano in .h5. Quindi ogni volta che vuoi salvare il tuo modello, semplicemente chiama agent.save('models/nome_modello.h5'). Lo stesso vale quando si desidera caricare uno.

Classe Agent completa

from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequential, load_modelimport numpy as npclass Agent:    def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01, gamma=0.99):        self.grid_size = grid_size        self.epsilon = epsilon        self.epsilon_decay = epsilon_decay        self.epsilon_end = epsilon_end        self.gamma = gamma    def build_model(self):        # Crea un modello sequenziale con 3 strati        model = Sequential([            # Lo strato di input si aspetta una griglia appiattita, quindi la shape dell'input è grid_size al quadrato            Dense(128, activation='relu', input_shape=(self.grid_size**2,)),            Dense(64, activation='relu'),            # Lo strato di output con 4 unità per le possibili azioni (su, giù, sinistra, destra)            Dense(4, activation='linear')        ])        model.compile(optimizer='adam', loss='mse')        return model        def get_action(self, state):        # rand() restituisce un valore casuale compreso tra 0 e 1        if np.random.rand() <= self.epsilon:            # Esplorazione: azione casuale            action = np.random.randint(0, 4)        else:            # Aggiungi una dimensione in più allo stato per creare un batch con una sola istanza            state = np.expand_dims(state, axis=0)            # Usa il modello per predire i Q-values (valori azione) per lo stato dato            q_values = self.model.predict(state, verbose=0)            # Seleziona e restituisci l'azione con il valore Q più alto            action = np.argmax(q_values[0]) # Prendi l'azione dalla prima (e unica) entry                # Decadimento del valore di epsilon per ridurre l'esplorazione nel tempo        if self.epsilon > self.epsilon_end:            self.epsilon *= self.epsilon_decay        return action        def learn(self, experiences):        states = np.array([experience.state for experience in experiences])        actions = np.array([experience.action for experience in experiences])        rewards = np.array([experience.reward for experience in experiences])        next_states = np.array([experience.next_state for experience in experiences])        dones = np.array([experience.done for experience in experiences])        # Predici i Q-values (valori azione) per il batch di stati dato        current_q_values = self.model.predict(states, verbose=0)        # Predici i Q-values per il batch di next_state        next_q_values = self.model.predict(next_states, verbose=0)        # Inizializza i target Q-values come i current Q-values        target_q_values = current_q_values.copy()        # Cicla su ogni esperienza nel batch        for i in range(len(experiences)):            if dones[i]:                # Se l'episodio è terminato, non c'è un prossimo Q-value                target_q_values[i, actions[i]] = rewards[i]            else:                # Il Q-value aggiornato è la ricompensa più il Q-value massimo scontato per lo stato successivo                # [i, actions[i]] è l'equivalente numpy di [i][actions[i]]                target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])        # Addestra il modello        self.model.fit(states, target_q_values, epochs=1, verbose=0)        def load(self, file_path):        self.model = load_model(file_path)    def save(self, file_path):        self.model.save(file_path)

Ogni classe del tuo ambiente di deep reinforcement learning è ora completa! Hai codificato con successo Agent, Environment e ExperienceReplay. L'unica cosa che resta è il ciclo di addestramento principale.

8. Eseguire il ciclo di addestramento: mettendo tutto insieme

Siamo nell'ultima fase del progetto! Ogni pezzo che abbiamo codificato, Agent, Environment e ExperienceReplay, ha bisogno di un modo per interagire.

Questo sarà il programma principale in cui viene eseguito ogni episodio e in cui definiamo i nostri iperparametri come epsilon.

Anche se è piuttosto semplice, dividerò ogni parte man mano che la codificheremo per renderla più chiara.

Inizializziamo ogni partePrima di tutto, impostiamo grid_size e utilizziamo le classi che abbiamo creato per inizializzare ogni istanza.

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)    ...

Ora abbiamo ogni parte di cui abbiamo bisogno per il ciclo di addestramento principale.

Episodi e limite di stepSuccessivamente, definiremo il numero di episodi che vogliamo eseguire l'addestramento e il numero massimo di step consentiti in ogni episodio.

Limitare il numero di step aiuta a garantire che il nostro agente non rimanga bloccato in un loop e favorisce percorsi più brevi. Saremo piuttosto generosi e per una griglia 5x5 impostermo il valore massimo a 200. Questo dovrà essere aumentato per ambienti più grandi.

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # Numero di episodi prima che l'addestramento si interrompa    episodi = 5000    # Numero massimo di step in ogni episodio    max_steps = 200    ...

Ciclo degli episodiIn ogni episodio reimpostiamo environment e salviamo lo stato iniziale. Quindi eseguiamo ogni step fino a quando sia done sia max_steps vengono raggiunti. Infine, salviamo il modello. La logica per ogni step non è ancora stata implementata.

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # Numero di episodi prima che l'addestramento si interrompa    episodi = 5000    # Numero massimo di step in ogni episodio    max_steps = 200    for episodio in range(episodi):        # Otteniamo lo stato iniziale dell'ambiente e impostiamo done su False        stato = environment.reset()        # Loop fino al termine dell'episodio        for step in range(max_steps):            # Logica per ogni step            ...            if done:                break            agent.save(f'modelli/modello_{grid_size}.h5')

Notare che nominiamo il modello utilizzando grid_size perché l'architettura della NN sarà diversa per ogni dimensione dell'input. Provare a caricare un modello 5x5 in un'architettura 10x10 provocherà un errore.

Logica degli stepInfine, all'interno del ciclo di step predisporremo l'interazione tra ogni pezzo come discusso in precedenza.

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # Numero di episodi prima che l'addestramento si interrompa    episodi = 5000    # Numero massimo di step in ogni episodio    max_steps = 200    for episodio in range(episodi):        # Otteniamo lo stato iniziale dell'ambiente e impostiamo done su False        stato = environment.reset()        # Loop fino al termine dell'episodio        for step in range(max_steps):            print('Episodio:', episodio)            print('Step:', step)            print('Epsilon:', agent.epsilon)            # Otteniamo la scelta dell'azione dalla politica degli agenti            azione = agent.get_action(stato)            # Facciamo un passo nell'ambiente e salviamo l'esperienza            reward, prossimo_stato, done = environment.step(azione)            experience_replay.add_experience(stato, azione, reward, prossimo_stato, done)            # Se l'esperienza replay ha memoria sufficiente per fornire un campione, addestriamo l'agente            if experience_replay.can_provide_sample():                esperienze = experience_replay.sample_batch()                agent.learn(esperienze)            # Impostiamo lo stato al prossimo_stato            stato = prossimo_stato                        if done:                break            agent.save(f'modelli/modello_{grid_size}.h5')

Per ogni passaggio dell'episodio, iniziamo stampando il numero dell'episodio e del passaggio per darci alcune informazioni su dove siamo nella formazione. Inoltre, è possibile stampare epsilon per vedere quale percentuale delle azioni dell'agente è casuale. Aiuta anche perché, se si vuole interrompere per qualsiasi motivo, è possibile riavviare l'agente allo stesso valore di epsilon.

Dopo aver stampato le informazioni, utilizziamo la politica dell'agente per ottenere azione da questo stato per compiere un passo nel mondo, registrando i valori restituiti.

Poi salviamo stato, azione, ricompensa, prossimo_stato e fatto come esperienza. Se esperienza_ricorrente ha memoria sufficiente, addestriamo l'agente su un batch casuale di esperienze.

Infine, impostiamo stato a prossimo_stato e verifichiamo se l'episodio è finito.

Dopo aver eseguito almeno un episodio avrai un modello salvato che puoi caricare e continuare da dove ti sei fermato o valutarne le prestazioni.

Dopo aver inizializzato l'agente, utilizza semplicemente il suo metodo di caricamento in modo simile a come abbiamo salvato — agente.carica(f'modelli/modello_{dimensione_griglia}.h5')

Puoi anche aggiungere un leggero ritardo ad ogni passaggio durante la valutazione del modello utilizzando time.sleep(0.5). Questo fa sì che ogni passaggio si metta in pausa per mezzo secondo. Assicurati di includere import time.

Ciclo di formazione completato

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__':    dimensione_griglia = 5    ambiente = Ambiente(dimensione_griglia=dimensione_griglia, render_on=True)    agente = Agente(dimensione_griglia=dimensione_griglia, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    # agente.carica(f'modelli/modello_{dimensione_griglia}.h5')    esperienza_ricorrente = EsperienzaRicorrente(capacità=10000, batch_size=32)        # Numero di episodi da eseguire prima che la formazione si interrompa    episodi = 5000    # Numero massimo di passi in ogni episodio    max_passi = 200    for episodio in range(episodi):        # Ottieni lo stato iniziale dell'ambiente e imposta fatto a False        stato = ambiente.reset()        # Loop fino alla fine dell'episodio        for passo in range(max_passi):            print('Episodio:', episodio)            print('Passo:', passo)            print('Epsilon:', agente.epsilon)            # Ottieni la scelta dell'azione dalla politica degli agenti            azione = agente.scegli_azione(stato)            # Fai un passo nell'ambiente e salva l'esperienza            ricompensa, prossimo_stato, fatto = ambiente.passo(azione)            esperienza_ricorrente.aggiungi_esperienza(stato, azione, ricompensa, prossimo_stato, fatto)            # Se l'esperienza ricorrente ha abbastanza memoria per fornire un campione, addestri l'agente            if esperienza_ricorrente.può_fornire_campione():                esperienze = esperienza_ricorrente.campiona_batch()                agente.apprendi(esperienze)            # Imposta lo stato a prossimo_stato            stato = prossimo_stato                        if fatto:                break                        # Opzionalmente, metti in pausa per mezzo secondo per valutare il modello            # time.sleep(0.5)        agente.salva(f'modelli/modello_{dimensione_griglia}.h5')

Quando hai bisogno di time.sleep o agente.carica, puoi semplicemente decommentarli.

Esecuzione del programmaProvalo! Dovresti essere in grado di addestrare con successo l'agente a completare l'obiettivo fino a un ambiente a griglia di circa 8x8. Una dimensione di griglia molto più grande di questa e la formazione inizia a faticare.

Cerca di vedere quanto grande puoi rendere l'ambiente. Puoi fare alcune cose come aggiungere strati e neuroni alla rete neurale, cambiare epsilon_decay o dare più tempo per l'addestramento. Fare ciò può solidificare la comprensione di ogni parte.

Ad esempio, potresti notare che epsilon raggiunge epsilon_end piuttosto velocemente. Non avere paura di cambiare il epsilon_decay con valori come 0.9998 o 0.99998 se lo desideri.

All'aumentare della dimensione della griglia, lo stato che viene alimentato alla rete diventa esponenzialmente più grande.

Ho incluso una breve sezione bonus alla fine per risolvere questo e per dimostrare che ci sono molti modi in cui puoi rappresentare l'ambiente per l'agente.

9. Conclusioni

Congratulazioni per aver completato questo viaggio esaustivo nel mondo del Reinforcement e Deep Q-Learning!

Anche se c'è sempre di più da coprire, potresti andartene avendo acquisito importanti conoscenze e competenze.

In questa guida hai:

  • Introdotti i concetti fondamentali dell'apprendimento per rinforzo e perché è un'area cruciale nell'ambito dell'IA.
  • Costruito un ambiente semplice, gettando le basi per l'interazione e l'apprendimento dell'agente.
  • Definita l'architettura della rete neurale dell'agente per l'utilizzo con il Deep Q-Learning, consentendo al tuo agente di prendere decisioni in ambienti più complessi rispetto al tradizionale Q-Learning.
  • Compreso perché l'esplorazione è importante prima di sfruttare la strategia appresa e implementato la politica Epsilon-Greedy.
  • Implementato il sistema di ricompensa per guidare l'agente verso l'obiettivo e appreso le differenze tra ricompense dense e sparse.
  • Progettato il meccanismo di esperienza ripetuta, consentendo all'agente di imparare dalle esperienze passate.
  • Acquisito esperienza pratica nell'adattamento della rete neurale, un processo critico in cui l'agente migliora le sue prestazioni in base al feedback dall'ambiente.
  • Unito tutti questi elementi in un ciclo di allenamento, osservando il processo di apprendimento dell'agente in azione e ottimizzandolo per prestazioni ottimali.

Ormai dovresti sentirti sicuro nella tua comprensione del Reinforcement Learning e del Deep Q-Learning. Hai costruito una solida base, non solo in teoria ma anche in applicazione pratica, costruendo una palestra DRL da zero.

Queste conoscenze ti permettono di affrontare problemi di RL più complessi e aprono la strada a ulteriori esplorazioni in questo eccitante campo dell'IA.

Gif di un gioco ispirato ad Agar.io in cui gli agenti sono incoraggiati a mangiarsi a vicenda per vincere - GIF dell'autore

Qui sopra c'è un gioco a griglia ispirato ad Agar.io in cui gli agenti sono incoraggiati a crescere di dimensioni, spesso mangiandosi a vicenda. Ad ogni passo, l'ambiente è stato rappresentato graficamente utilizzando la libreria Python, Matplotlib. Le caselle intorno agli agenti sono il loro campo visivo. Questo viene fornito loro come stato dall'ambiente come griglia appiattita, simile a quanto abbiamo fatto nel nostro sistema.

Giochi come questo e una miriade di altri utilizzi possono essere realizzati con semplici modifiche a quanto hai realizzato qui.

Tuttavia, ricorda che il Deep Q-Learning è adatto solo per uno spazio delle azioni discreto - uno che ha un numero finito di azioni distinte. Per uno spazio delle azioni continuo, come in un ambiente basato sulla fisica, sarà necessario esplorare altri metodi nel mondo del DRL.

10. Bonus: Ottimizza la Rappresentazione dello Stato

Credi o non credi, il modo in cui stiamo attualmente rappresentando lo stato non è ottimale per questo utilizzo.

In realtà, è estremamente inefficiente.

Per una griglia di 100x100 ci sono 99.990.000 possibili stati. Non solo il modello dovrebbe essere piuttosto grande considerando la dimensione dell'input - 10.000 valori, ma richiederebbe un notevole volume di dati di addestramento. A seconda delle risorse computazionali disponibili, ciò potrebbe richiedere giorni o settimane.

Un altro svantaggio è la flessibilità. Attualmente il modello è bloccato a una sola dimensione di griglia. Se si desidera utilizzare una griglia di dimensioni diverse, è necessario addestrare completamente un altro modello.

Abbiamo bisogno di un modo per rappresentare lo stato che riduca significativamente lo spazio degli stati e si traduca facilmente in qualsiasi dimensione di griglia.

Il modo miglioreAnche se ci sono diversi modi per fare ciò, il modo più semplice e probabilmente più efficace è utilizzare la distanza relativa dall'obiettivo.

Invece dello stato di una griglia 5x5 che appare così:

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

Può essere rappresentato con solo due valori:

[-2, -1]

Utilizzando questo metodo si ridurrebbe lo spazio degli stati di una griglia 100x100 da 99.990.000 a 39.601!

Non solo, ma può generalizzare molto meglio. Deve semplicemente imparare che muoversi verso il basso è la scelta giusta quando il primo valore è negativo, e muoversi verso destra è appropriato quando il secondo valore è negativo, con le azioni opposte che si applicano per i valori positivi.

Ciò consente al modello di esplorare solo una frazione dello spazio degli stati.

Mappa termica 25x25 delle decisioni dell'agente in ogni cella con l'obiettivo al centro - GIF di autore

Qui sopra è illustrata la progressione dell'apprendimento di un modello, addestrato su una griglia 25x25. Mostra il colore delle scelte dell'agente in ogni cella con l'obiettivo al centro.

All'inizio, durante la fase di esplorazione, la strategia dell'agente è completamente errata. Si può notare che sceglie di andare verso l'alto quando si trova sopra l'obiettivo, verso il basso quando si trova sotto, e così via.

Ma in meno di 10 episodi impara una strategia che gli consente di raggiungere l'obiettivo nel minor numero di passi possibile da qualsiasi cella.

Questo si applica anche con l'obiettivo in qualsiasi posizione.

Quattro mappe termiche 25x25 del modello applicato a diverse posizioni di obiettivo - Immagine di autore

E infine riesce a generalizzare molto bene il suo apprendimento.

Mappa termica 201x201 delle decisioni del modello 25x25, mostrando la generalizzazione - Immagine di autore

Questo modello ha visto solo una griglia 25x25, ma potrebbe utilizzare la sua strategia su un ambiente molto più grande - 201x201. Con un ambiente di questa dimensione ci sono 1.632.200.400 permutazioni agente-obiettivo!

Aggiorniamo il nostro codice con questa miglioramento radicale.

ImplementazioneNon c'è molto da fare per far funzionare tutto questo, fortunatamente.

La prima cosa da fare è aggiornare get_state in Environment.

def get_state(self):    # Calcola la distanza in righe e la distanza in colonne    distanza_relativa = (self.posizione_agente[0] - self.posizione_obiettivo[0],                         self.posizione_agente[1] - self.posizione_obiettivo[1])        # Scompatta la tupla in un array NumPy    stato = np.array([*distanza_relativa])    return stato

Al posto di una versione appiattita della griglia, calcoliamo la distanza dal obiettivo e la restituiamo come un array NumPy. L'operatore * scompatta semplicemente la tupla in componenti individuali. Avrà lo stesso effetto di fare questo - stato = np.array([distanza_relativa[0], distanza_relativa[1]]).

Inoltre, in move_agent possiamo aggiornare la penalizzazione per colpire il confine in modo che sia uguale a muoversi lontano dall'obiettivo. In questo modo, quando si cambia la dimensione della griglia, l'agente non verrà scoraggiato dal muoversi al di fuori di dove è stato addestrato inizialmente.

def move_agent(self, azione):    ...    else:        # La stessa penalizzazione per una mossa non valida        ricompensa = -1.1            return ricompensa, fatto

Aggiornare l'architettura neuraleAttualmente il nostro modello TensorFlow è così. Ho escluso tutto il resto per semplicità.

class Agente:    def __init__(self, dimensione_griglia, ...):        self.dimensione_griglia = dimensione_griglia        ...        self.modello = self.crea_modello()    def crea_modello(self):        # Creare un modello sequenziale con 3 livelli        modello = Sequential([            # Il livello di input si aspetta una griglia appiattita, quindi la forma di input è la dimensione_griglia al quadrato            Dense(128, activation='relu', input_shape=(self.dimensione_griglia**2,)),            Dense(64, activation='relu'),            # Il livello di output con 4 unità per le azioni possibili (su, giù, sinistra, destra)            Dense(4, activation='linear')        ])        modello.compile(optimizer='adam', loss='mse')        return modello    ...

Se ti ricordi, la nostra architettura del modello ha bisogno di un input coerente. In questo caso, la dimensione dell'input si basava su grid_size.

Con la nostra rappresentazione aggiornata dello stato, ogni stato avrà solo due valori, indipendentemente da grid_size. Possiamo aggiornare il modello per aspettarsi questo. Inoltre, possiamo rimuovere del tutto self.grid_size perché la classe Agent non dipende più da esso.

class Agent:    def __init__(self, ...):        ...        self.model = self.build_model()    def build_model(self):        # Crea un modello sequenziale con 3 strati        model = Sequential([            # Il livello di input prevede una griglia appiattita, quindi la forma di input è grid_size al quadrato            Dense(64, activation='relu', input_shape=(2,)),            Dense(32, activation='relu'),            # Strato di output con 4 unità per le azioni possibili (su, giù, sinistra, destra)            Dense(4, activation='linear')        ])        model.compile(optimizer='adam', loss='mse')        return model    ...

Il parametro input_shape si aspetta una tupla che rappresenti lo stato dell'input.

(2,) specifica un array unidimensionale con due valori. Sembra qualcosa del genere:

[-2, 0]

Mentre (2,1), ad esempio, specifica un array bidimensionale con due righe e una colonna. Sembra qualcosa del genere:

[[-2], [0]]

Infine, abbiamo ridotto il numero di neuroni nei nostri strati nascosti a 64 e 32 rispettivamente. Con questa semplice rappresentazione dello stato, probabilmente è ancora eccessivo, ma dovrebbe essere abbastanza veloce.

Quando inizi ad allenarti, prova a vedere quanti pochi neuroni hai bisogno affinché il modello impari efficacemente. Puoi anche provare a rimuovere il secondo strato se vuoi.

Risolvere il ciclo principale di allenamentoIl ciclo di allenamento richiede pochi aggiustamenti. Aggiorniamolo per abbinare le nostre modifiche.

from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__':    grid_size = 5    environment = Environment(grid_size=grid_size, render_on=True)    agent = Agent(epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)    # agent.load(f'models/model.h5')    experience_replay = ExperienceReplay(capacity=10000, batch_size=32)        # Numero di episodi da eseguire prima che l'allenamento si fermi    episodes = 5000    # Numero massimo di passi in ogni episodio    max_steps = 200    for episode in range(episodes):        # Ottieni lo stato iniziale dell'ambiente e imposta done su False        state = environment.reset()        # Loop finché l'episodio non finisce        for step in range(max_steps):            print('Episodio:', episode)            print('Passo:', step)            print('Epsilon:', agent.epsilon)            # Ottieni la scelta di azione dalla politica degli agenti            action = agent.get_action(state)            # Fai un passo nell'ambiente e salva l'esperienza            reward, next_state, done = environment.step(action)            experience_replay.add_experience(state, action, reward, next_state, done)            # Se l'esperienza di replay ha abbastanza memoria per fornire un campione, addestra l'agente            if experience_replay.can_provide_sample():                experiences = experience_replay.sample_batch()                agent.learn(experiences)            # Imposta lo stato a next_state            state = next_state                        if done:                break            # Opzionalmente, metti in pausa per mezzo secondo per valutare il modello            # time.sleep(0.5)        agent.save(f'models/model.h5')

Poiché agent non ha più bisogno di grid_size, possiamo rimuoverlo per evitare eventuali errori.

Non dobbiamo nemmeno più dare al modello nomi diversi per ogni grid_size, poiché un modello ora funziona su qualsiasi dimensione.

Se sei curioso riguardo a ExperienceReplay, rimarrà lo stesso.

Si noti che non esiste una rappresentazione dello stato universale. In alcuni casi, potrebbe avere senso fornire l'intera griglia come abbiamo fatto noi, o una sua sottosezione come ho fatto con il sistema multi-agente nella sezione 9. L'obiettivo è trovare un equilibrio tra la semplificazione dello spazio degli stati e la fornitura di informazioni adeguate per l'apprendimento dell'agente.

Iper-parametriAnche un ambiente semplice come il nostro richiede modifiche agli iper-parametri. Ricorda che questi sono i valori che possiamo modificare che influenzano l'addestramento.

Abbiamo discusso di ognuno dei seguenti punti:

  • epsilon, epsilon_decay, epsilon_end (esplorazione/sfruttamento)
  • gamma (fattore di sconto)
  • numero di neuroni e livelli
  • batch_size, capacity (esperienza ripetuta)
  • max_steps

Ci sono molti altri, ma ne discuteremo solo un altro che sarà fondamentale per l'apprendimento.

Tasso di apprendimento Il Tasso di Apprendimento (LR) è un iper-parametro del modello di rete neurale.

In sostanza, indica alla rete neurale quanto deve regolare i suoi pesi, ossia i valori utilizzati per la trasformazione degli input, ogni volta che viene adattata ai dati.

I valori del LR di solito vanno da 1 fino a 0,0000001, con i valori più comuni come 0,01, 0,001 e 0,0001.

Una Tasso di Apprendimento sub-ottimale che potrebbe non convergere mai su una strategia ottimale - Immagine dell'autore

Se il tasso di apprendimento è troppo basso, potrebbe non aggiornare abbastanza velocemente i valori Q per imparare una strategia ottimale, un processo noto come convergenza. Se noti che vi è una stagnazione nell'apprendimento, o nulla affatto, potrebbe essere un segno che il tasso di apprendimento non è sufficientemente alto.

Anche se questi diagrammi sul tasso di apprendimento sono molto semplificati, dovrebbero dare un'idea generale.

Una Tasso di Apprendimento sub-ottimale che fa crescere esponenzialmente i valori Q - Immagine dell'autore

D'altra parte, un tasso di apprendimento troppo alto può causare un "esplosione" dei valori o un loro aumento sempre più grande. Gli adeguamenti che il modello fa sono troppo grandi, causando la divergenza o il peggioramento nel tempo.

Qual è il tasso di apprendimento perfetto? Quanto è lungo un pezzo di spago?

In molti casi bisogna semplicemente provare e sbagliare. Un buon modo per determinare se il tasso di apprendimento è un problema è controllare l'output del modello.

Questo è esattamente il problema che ho affrontato durante l'addestramento di questo modello. Dopo aver passato alla rappresentazione semplificata dello stato, ha rifiutato di imparare. L'agente continuava effettivamente ad andare in basso a destra della griglia dopo aver testato approfonditamente ciascun iper-parametro.

Non aveva senso per me, quindi ho deciso di dare un'occhiata ai valori Q output dal modello nel metodo get_action dell'Agente.

Step 10[[ 0.29763165 0.28393078 -0.01633328 -0.45749056]]Step 50[[ 7.173178 6.3558702 -0.48632553 -3.1968129 ]]Step 100[[ 33.015953 32.89661 33.11674 -14.883122]]Step 200[[573.52844 590.95685 592.3647 531.27576]]...Step 5000[[37862352. 34156752. 35527612. 37821140.]]

Questo è un esempio di valori esplosivi.

In TensorFlow, l'ottimizzatore che stiamo usando per regolare i pesi, Adam, ha un tasso di apprendimento predefinito di 0,001. In questo caso specifico era troppo alto.

Tasso di apprendimento bilanciato, convergendo infine alla strategia ottimale - Immagine dell'autore

Dopo aver testato vari valori, sembra che il valore ottimale sia 0.00001.

Implementiamo questo.

from tensorflow.keras.optimizers import Adamdef build_model(self):    # Crea un modello sequenziale con 3 layer    model = Sequential([        # Il layer di input si aspetta una griglia appiattita, quindi la forma di input è il quadrato della dimensione della griglia        Dense(64, activation='relu', input_shape=(2,)),        Dense(32, activation='relu'),        # Layer di output con 4 unità per le possibili azioni (alto, basso, sinistra, destra)        Dense(4, activation='linear')    ])         # Aggiorna il tasso di apprendimento    optimizer = Adam(learning_rate=0.00001)    # Compila il modello con l'ottimizzatore personalizzato    model.compile(optimizer=optimizer, loss='mse')    return model

Sentiti libero di adattarlo e osserva come vengono influenzati i valori Q. Assicurati anche di importare Adam.

Infine, puoi iniziare di nuovo ad addestrare!

Codice per la heat-mapDi seguito è riportato il codice per creare la tua heat-map come mostrato in precedenza, se sei interessato.

import matplotlib.pyplot as pltimport numpy as npfrom tensorflow.keras.models import load_modeldef generate_heatmap(episode, grid_size, model_path):    # Carica il modello    model = load_model(model_path)        goal_location = (grid_size // 2, grid_size // 2)  # Centro della griglia    # Inizializza un array per memorizzare le intensità dei colori    heatmap_data = np.zeros((grid_size, grid_size, 3))    # Definisci i colori per ogni azione    colors = {        0: np.array([0, 0, 1]),  # Blu per l'alto        1: np.array([1, 0, 0]),  # Rosso per il basso        2: np.array([0, 1, 0]),  # Verde per sinistra        3: np.array([1, 1, 0])   # Giallo per destra    }    # Calcola i valori Q per ogni stato e determina l'intensità del colore    for x in range(grid_size):        for y in range(grid_size):            relative_distance = (x - goal_location[0], y - goal_location[1])            state = np.array([*relative_distance]).reshape(1, -1)            q_values = model.predict(state)            best_action = np.argmax(q_values)            if (x, y) == goal_location:                heatmap_data[x, y] = np.array([1, 1, 1])            else:                heatmap_data[x, y] = colors[best_action]    # Plot della heat-map    plt.imshow(heatmap_data, interpolation='nearest')    plt.xlabel(f'Episodio: {episode}')    plt.axis('off')    plt.tight_layout(pad=0)    plt.savefig(f'./figures/heatmap_{grid_size}_{episode}', bbox_inches='tight')

Basta importarlo nel ciclo di addestramento e eseguirlo con la frequenza desiderata.

Prossimi passiUna volta addestrato in modo efficace il tuo modello e sperimentato con gli iperparametri, ti incoraggio a renderlo davvero tuo.

Alcune idee per espandere il sistema:

  • Aggiungi ostacoli tra l'agente e l'obiettivo
  • Crea un ambiente più vario, eventualmente con stanze e percorsi generati casualmente
  • Implementa un sistema di cooperazione/competizione multi-agente — nascondino
  • Crea un gioco ispirato a Pong
  • Implementa la gestione delle risorse come fame o energia, dove l'agente deve raccogliere cibo lungo il percorso verso l'obiettivo

Ecco un esempio che va oltre il nostro semplice sistema di griglia:

Gioco ispirato a Flappy Bird in cui l'agente deve evitare i tubi per sopravvivere — GIF di autore

Utilizzando Pygame, una popolare libreria Python per creare giochi 2D, ho costruito un clone di Flappy Bird. Poi ho definito le interazioni, i vincoli e la struttura dei premi nella nostra classe predefinita Environment.

Ho rappresentato lo stato come la velocità e la posizione corrente dell'agente, la distanza dal tubo più vicino e la posizione dell'apertura.

Per la classe Agent ho semplicemente aggiornato la dimensione di input a (4,), aggiunto più layer alla rete neurale e aggiornato la rete in modo da produrre solo due valori — saltare o non saltare.

Puoi trovare e eseguire questo nella directory flappy_bird nel repo di GitHub. Assicurati di eseguire pip install pygame.

Questo mostra che quello che hai creato è applicabile a una varietà di ambienti. Puoi persino far esplorare all'agente un ambiente in 3D o eseguire compiti più astratti come il trading di azioni.

Mentre espandi il tuo sistema, non aver paura di essere creativo con il tuo ambiente, la rappresentazione dello stato e il sistema di ricompense. Come l'agente, impariamo meglio attraverso l'esplorazione!

Spero che la creazione di una palestra DRL da zero abbia aperto i tuoi occhi alla bellezza dell'IA e ti abbia ispirato a approfondire.

Questo articolo è stato ispirato dal libro Neural Networks From Scratch In Python e dalla serie di YouTube di Harrison Kinsley (sentdex) e Daniel Kukieł. Lo stile di conversazione e le implementazioni di codice da zero hanno davvero consolidato la mia comprensione delle reti neurali.