Caso d’uso del classificatore a due teste

Il caso d'uso del classificatore a due teste una soluzione vincente

Foto di Vincent van Zalinge su Unsplash

Idea

Parliamo di alcuni casi reali di compiti di visione artificiale. A prima vista, il problema della classificazione è semplice, ed è vero. Ma nel mondo reale spesso hai molte restrizioni come: velocità del modello, dimensione, capacità di eseguirlo su un dispositivo mobile. Inoltre, probabilmente avrai diversi compiti e non è la migliore idea avere un modello separato per ogni compito. Almeno quando puoi ottimizzare l’architettura del tuo sistema e utilizzare meno modelli, dovresti farlo. Ma ovviamente, non vuoi perdere accuratezza, vero? Quindi, quando consideri tutti i vincoli e le ottimizzazioni, il tuo compito diventa più complesso. Voglio mostrarti un esempio di un problema di classificazione con diverse classi che, visivamente, potrebbero non essere così simili.

Inizierò con un compito semplice: classificare se un’immagine è un vero documento cartaceo o se è un’immagine di uno schermo con qualche documento su di esso. Potrebbe essere un tablet / telefono o un grande monitor.

Documento reale
Schermo

E questo è piuttosto semplice. Inizi con un dataset, lo raccogli in modo che sia rappresentativo, pulito e abbastanza grande. Poi prendi un modello che funziona con i tuoi vincoli (velocità, accuratezza, esportabilità) e utilizzi una semplice pipeline di formazione, prestare attenzione ai dati sbilanciati. Questo dovrebbe darti risultati abbastanza buoni.

Ma diciamo che ora devi aggiungere una nuova funzionalità in modo che il tuo modello possa classificare se l’input in arrivo è un’immagine di un documento o qualcosa che non è un documento, come un sacchetto di patatine / lattina o del materiale di marketing. E questo compito non è così importante come quello originale, e nemmeno così difficile.

Non un documento

Ecco la struttura del nostro dataset:

dataset/├── documents/│   ├── img_1.jpg|   ...│   └── img_100.jpg├── screens/│   ├── img_1.jpg|   ...│   └── img_100.jpg├── not a documents/│   ├── img_1.jpg│   ...│   └── img_100.jpg├── train.csv├── val.csv└── test.csv

E la struttura del file csv:

documents/img_1.jpg      | 0not a document/img_1.jpg | 1screens/img_1.jpg        | 2...

La prima colonna contiene un percorso relativo all’immagine, la seconda colonna – l’id della classe. Ora parliamo di due approcci per risolvere questo compito.

Approccio con tre neuroni di output (semplice)

Poiché vogliamo avere un’architettura di sistema ottimale, non creeremo un nuovo modello che sia di nuovo un classificatore binario per ogni piccolo compito. La prima idea che viene in mente è quella di aggiungere questa (classe non di documento) come terza classe al nostro modello originale, in modo da avere classi come: ‘documento’, ‘schermo’, ‘non_documento’.

E questa è un’opzione fattibile, ma forse l’importanza di questi compiti non è uguale e anche visivamente queste classi potrebbero non essere così simili, e potresti volere caratteristiche leggermente diverse estratte per il tuo strato di classificazione. Inoltre, non dimenticare che è molto importante non perdere accuratezza del compito originale.

Due testa con approccio di classificazione binaria (personalizzato)

Un altro approccio sarebbe quello di utilizzare principalmente una colonna vertebrale e due teste con classificazione binaria, una testa per ogni compito. In questo modo avremo 1 modello per 2 compiti, ogni compito sarà separato e avremo molto controllo su ciascun compito.

La velocità praticamente non ne risentirà (ho ottenuto un’infrazione più lenta di ~ 5-7% su 1 immagine con 3060), la dimensione del modello diventerà un po’ più grande (nel mio caso dopo l’esportazione in TFLlite è passato da 500 kB a 700 kB). Un’altra cosa comoda per il nostro caso sarebbe pesare le nostre perdite, quindi la perdita della prima testa ha un peso N volte maggiore rispetto alla perdita della seconda testa. In questo modo possiamo essere sicuri che il nostro focus sia sul primo (principale) compito e sia meno probabile perdere precisione su di esso.

Ecco come appare:

Output a due teste

Sto utilizzando SuffleNetV2 per questo compito e ho diviso l’architettura in due parti, a partire dall’ultimo livello convoluzionale. Ogni testa ha il proprio ultimo livello convoluzionale, pooling globale e livello completamente connesso per la classificazione.

Esempi di codice

Ora che comprendiamo l’architettura del modello, è chiaro che dobbiamo apportare alcune modifiche alla nostra pipeline di addestramento, a partire dal generatore di dataset. Durante la scrittura del codice per il dataset e il dataloader, ora dobbiamo restituire 1 immagine e 2 etichette per ogni iterazione. La prima etichetta verrà utilizzata per la prima testa e la seconda per la seconda testa, diamo un’occhiata a un esempio di codice:

class CustomDataset(Dataset):    def __init__(        self,        root_path: Path,        split: pd.DataFrame,        train_mode: bool,    ) -> None:        self.root_path = root_path        self.split = split        self.img_size = (256, 256)        self.norm = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])        self._init_augs(train_mode)    def _init_augs(self, train_mode: bool) -> None:        if train_mode:            self.transform = transforms.Compose(                [                    transforms.Resize(self.img_size),                    transforms.Lambda(self._convert_rgb),                    transforms.RandomRotation(10),                    transforms.ToTensor(),                    transforms.Normalize(*self.norm),                ]            )        else:            self.transform = transforms.Compose(                [                    transforms.Resize(self.img_size),                    transforms.Lambda(self._convert_rgb),                    transforms.ToTensor(),                    transforms.Normalize(*self.norm),                ]            )    def _convert_rgb(self, x: torch.Tensor) -> torch.Tensor:        return x.convert("RGB")    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int, int]:        image_path, label = self.split.iloc[idx]        image = Image.open(self.root_path / image_path)        image.draft("RGB", self.img_size)        image = ImageOps.exif_transpose(image)  # fix rotation        image = self.transform(image)        label_lcd = int(label == 2)        label_other = int(label == 1)        return image, label_lcd, label_other    def __len__(self) -> int:        return len(self.split)

Siamo interessati solo a __getitem__, dove dividiamo label in label_lcd e label_other (le nostre 2 teste). label_lcd è 1 per uno ‘schermo’ e 0 per gli altri casi. label_other è 1 per ‘non un documento’ e 0 per gli altri casi.

Per la nostra architettura, abbiamo quanto segue:

class CustomShuffleNet(nn.Module):    def __init__(self, n_outputs_1: int, n_outputs_2: int) -> None:        super(CustomShuffleNet, self).__init__()        self.base_model = models.shufflenet_v2_x0_5(            weights=models.ShuffleNet_V2_X0_5_Weights.DEFAULT        )        # Create head convolution layers        self.head1_conv = self._create_head_conv()        self.head2_conv = self._create_head_conv()        # Create fully connected layers for both heads        in_features = self.base_model.fc.in_features        del self.base_model.fc        self.fc1 = nn.Linear(in_features, n_outputs_1)        self.fc2 = nn.Linear(in_features, n_outputs_2)    def _create_head_conv(self) -> nn.Module:        return nn.Sequential(            nn.Conv2d(192, 1024, kernel_size=1, stride=1, bias=False),            nn.BatchNorm2d(1024),            nn.ReLU(inplace=True),        )    def forward(self, x: torch.Tensor) -> torch.Tensor:        x = self.base_model.conv1(x)        x = self.base_model.maxpool(x)        x = self.base_model.stage2(x)        x = self.base_model.stage3(x)        x = self.base_model.stage4(x)        # Pass through the separate convolutions for each head        x1 = self.head1_conv(x)        x1 = x1.mean([2, 3])  # globalpool for first head        x2 = self.head2_conv(x)        x2 = x2.mean([2, 3])  # globalpool for second head        out1 = self.fc1(x1)        out2 = self.fc2(x2)        return out1, out2

Dalla last conv layer (inclusa), l’architettura è divisa in due rami paralleli. Ora il modello ha 2 output, come richiesto.

Loop di addestramento:

def train(    train_loader: DataLoader,    val_loader: DataLoader,    device: str,    model: nn.Module,    loss_func: nn.Module,    optimizer: torch.optim.Optimizer,    scheduler: torch.optim.lr_scheduler,    epochs: int,    path_to_save: Path,) -> None:    best_metric = 0    wandb.watch(model, log_freq=100)    for epoch in range(1, epochs + 1):        model.train()        with tqdm(train_loader, unit="batch") as tepoch:            for inputs, labels_1, labels_2 in tepoch:                inputs, labels_1, labels_2 = (                    inputs.to(device),                    labels_1.to(device),                    labels_2.to(device),                )                tepoch.set_description(f"Epoch {epoch}/{epochs}")                optimizer.zero_grad()                outputs_1, outputs_2 = model(inputs)                loss_1 = loss_func(outputs_1, labels_1)                loss_2 = loss_func(outputs_2, labels_2)                loss = 2 * loss_1 + loss_2                loss.backward()                optimizer.step()                tepoch.set_postfix(loss=loss.item())        metrics = evaluate(            test_loader=val_loader, model=model, device=device, mode="val"        )        if scheduler is not None:            scheduler.step()        if metrics["f1_1"] > best_metric:            best_metric = metrics["f1_1"]            print("Salvataggio del nuovo miglior modello...")            path_to_save.parent.mkdir(parents=True, exist_ok=True)            torch.save(model.state_dict(), path_to_save)        wandb_logger(loss, metrics, mode="val")

Otteniamo immagine, label_1, label_2 dal dataset, facciamo passare l’immagine (in realtà un batch) attraverso il modello, quindi calcoliamo le perdite 2 volte (una volta per ogni output del ramo). Moltiplichiamo la nostra perdita principale per 2 per concentrarci sul nostro ramo “principale”. Certamente, dobbiamo modificare cose come il calcolo delle metriche per adattare il nostro modello a due rami (puoi trovare un esempio completo nel repo). E cosa importante – salviamo il nostro modello in base alla metrica ottenuta dal nostro ramo “principale”.

Risultati

Non ha senso confrontare i punteggi F1 dalla pipeline di addestramento, poiché vengono calcolati per 3 e 2 classi e siamo interessati alle metriche separatamente. Ecco perché ho usato un dataset di test specifico, ho eseguito entrambi i modelli e ho confrontato precision e recall per il compito document/screen e document/not_document separatamente.

Entrambi i modelli utilizzano una dimensione di input di 256×256, ma ho anche aggiunto una versione di un approccio con 3 neuroni di output semplice con una dimensione di input di 320×320, poiché il tempo di inferenza era praticamente lo stesso di quello del modello a due rami, quindi era interessante confrontare. Il secondo compito ha dato risultati esattamente uguali per entrambi gli approcci (poiché è un compito facile per il mio modello), ma ci sono differenze con il compito principale.

+----------------------------+-----------+-----------+--------------+|      Model (img size)      | Precision |  Recall   | Latency (s)* |+----------------------------+-----------+-----------+--------------+| Three output neurons (256) |     0.993 | 0.855     |        0.027 || Three output neurons (320) |       1.0 | 0.846     |        0.029 || Two heads (256)            |       1.0 | 0.873     |        0.029 |+----------------------------+-----------+-----------+--------------+

Latency (s)* – tempo medio di inferenza su 1 immagine, comprese le trasformazioni e la softmax.

Ecco l’impulso che cercavamo! Il modello a due rami ha gli stessi punteggi per il compito secondario, ma per il compito principale ha una precision uguale o migliore e un recall più alto. E questo è con dati del mondo reale (non da divisioni di addestramento/validazione/test).

Nota: per questo compito non solo c’è un compito più importante (document/screen), ma anche la precision è più importante del recall, quindi nell’approccio con 3 neuroni di output e dimensione di input 320 vince. Ma alla fine, il modello a due rami ottiene comunque punteggi migliori con lo stesso tempo di inferenza.

Un’ulteriore cosa importante. Questo approccio ha funzionato meglio nel mio caso con un modello e dati specifici. Ha funzionato anche per alcuni altri compiti, ma è fondamentale creare sempre ipotesi ed eseguire esperimenti per testarle e trovare il miglior approccio. Per questo, consiglio di utilizzare strumenti per salvare le configurazioni e i risultati degli esperimenti. Qui ho usato Hydra per le configurazioni e Wandb per il tracciamento degli esperimenti.

Per riassumere

  • La classificazione è facile, ma diventa più difficile con tutti i vincoli del mondo reale
  • Optimizza i sottocompiti e cerca di non creare K modelli per ogni grande compito
  • Personalizza i modelli e le pipeline di addestramento per avere un maggiore controllo
  • Testa le tue ipotesi, esegui esperimenti e salva i risultati (hydra, wandb…)

E questo è praticamente tutto, puoi trovare un esempio di codice completo qui, così potrai eseguire i test da solo. Non esitare a contattarmi se hai domande o suggerimenti!