Implementare un modello di machine learning personalizzato come endpoint di SageMaker

Implementazione di un modello di machine learning personalizzato come endpoint di SageMaker

Foto di Ricardo Gomez Angel su Unsplash

Deployment dell’Endpoint SageMaker

Una guida rapida e semplice per creare un endpoint SageMaker AWS per il tuo modello

Sviluppare un modello di machine learning (ML) comporta passaggi chiave, dalla raccolta dei dati alla distribuzione del modello. Dopo aver affinato gli algoritmi e garantito le prestazioni attraverso i test, l’ultimo passaggio cruciale è la distribuzione. Questa fase trasforma l’innovazione in utilità, consentendo agli altri di beneficiare delle capacità predictive del modello. Il modello di ML distribuito colma il divario tra lo sviluppo e l’impatto nel mondo reale, fornendo vantaggi tangibili agli utenti e agli stakeholder.

Questa guida copre i passaggi di base necessari per sviluppare un endpoint SageMaker ML personalizzato. A questo punto, si presume che tu abbia già un modello funzionante e desideri esporlo al resto del mondo tramite un endpoint. La guida ti aiuterà a distribuire un modello basato su PyTorch che mira a prevedere anomalie in video clip. Il modello, anche conosciuto come AI VAD, si basa sul paper “Attribute-based Representations for Accurate and Interpretable Video Anomaly Detection”, e la sua implementazione può essere trovata nel repository GitHub di anomalib di OpenVINO. Per saperne di più su questo interessante approccio, scorri fino alla fine di questo articolo nella sezione Appendice.

A questo punto, desidero sottolineare che in questo caso non possiamo utilizzare l’astrazione PyTorchModel specificamente creata per distribuire modelli PyTorch per due motivi. Il primo motivo è che abbiamo il pacchetto anomalib come dipendenza aggiuntiva che non è inclusa nell’immagine pre-costruita di PyTorch Sagemaker. Il secondo motivo è che il modello richiede informazioni aggiuntive apprese durante la fase di addestramento che non fanno parte dei pesi del modello PyTorch.

Ecco i passaggi per raggiungere questo obiettivo:

  1. Scrivi lo script di servizio del modello SageMaker
  2. Carica il modello su S3
  3. Carica un’immagine Docker personalizzata in AWS ECR
  4. Crea un Modello in SageMaker
  5. Crea una Configurazione dell’Endpoint
  6. Crea un Endpoint
  7. Invocare l’Endpoint

Scrivi lo script di servizio del modello SageMaker

Lo script di servizio del modello SageMaker (inference.py) è un componente importante nella creazione di un modello SageMaker. Funge da ponte tra i modelli di machine learning e i dati del mondo reale. Essenzialmente, elabora le richieste in arrivo, esegue le previsioni del modello e restituisce i risultati. Influenza quindi il processo decisionale di un’applicazione.

Lo script inference.py è composto da diversi metodi chiave, ognuno con uno scopo unico, che facilita collettivamente il processo di servizio del modello. Di seguito elenco i quattro principali.

  1. Il metodo model_fn è responsabile del caricamento del modello addestrato. Legge gli artefatti del modello che sono stati salvati e restituisce un oggetto modello che può essere utilizzato per le previsioni. Questo metodo viene chiamato solo una volta quando il server del modello SageMaker viene avviato.
  2. Il metodo input_fn prende i dati della richiesta e li formatta in una forma adatta per effettuare previsioni. Ad esempio, nel codice sottostante questa funzione formatta i dati in modo diverso in base all’origine dei dati (byte dell’immagine o elenco di URI S3) e se l’elenco di frame deve essere considerato come una singola clip video.
  3. Il metodo predict_fn prende i dati della richiesta formattati e esegue l’inférence sul modello caricato.
  4. Infine, viene utilizzato il metodo output_fn. Prende il risultato della previsione e lo formatta in un messaggio di risposta. Ad esempio, lo incapsula in un oggetto JSON.

Di seguito puoi trovare il codice per lo script di servizio del modello Sagemaker.

import osimport jsonimport joblibimport torchfrom PIL import Imageimport numpy as npimport ioimport boto3from enum import Enumfrom urllib.parse import urlsplitfrom omegaconf import OmegaConffrom anomalib.data.utils import read_image, InputNormalizationMethod, get_transformsfrom anomalib.models.ai_vad.torch_model import AiVadModeldevice = "cuda"class PredictMode(Enum):    frame = 1    batch = 2    clip = 3    def model_fn(model_dir):    """    Questa funzione è la prima ad essere eseguita al momento di una richiesta di previsione,    carica il modello dal disco e restituisce l'oggetto modello che verrà successivamente utilizzato per l'elaborazione.    """    # Carica il file di configurazione    config = OmegaConf.load(os.path.join(model_dir, "ai_vad_config.yaml"))    config_model = config.model    # Carica il modello    model = AiVadModel(            box_score_thresh=config_model.box_score_thresh,            persons_only=config_model.persons_only,            min_bbox_area=config_model.min_bbox_area,            max_bbox_overlap=config_model.max_bbox_overlap,            enable_foreground_detections=config_model.enable_foreground_detections,            foreground_kernel_size=config_model.foreground_kernel_size,            foreground_binary_threshold=config_model.foreground_binary_threshold,            n_velocity_bins=config_model.n_velocity_bins,            use_velocity_features=config_model.use_velocity_features,            use_pose_features=config_model.use_pose_features,            use_deep_features=config_model.use_deep_features,            n_components_velocity=config_model.n_components_velocity,            n_neighbors_pose=config_model.n_neighbors_pose,            n_neighbors_deep=config_model.n_neighbors_deep,        )    # Carica i pesi del modello    model.load_state_dict(torch.load(os.path.join(model_dir, "ai_vad_weights.pth"), map_location=device), strict=False)    # Carica le memory banks    velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank = joblib.load(os.path.join(model_dir, "ai_vad_banks.joblib"))     if velocity_estimator_memory_bank is not None:        model.density_estimator.velocity_estimator.memory_bank = velocity_estimator_memory_bank    if pose_estimator_memory_bank is not None:        model.density_estimator.pose_estimator.memory_bank = pose_estimator_memory_bank    if appearance_estimator_memory_bank is not None:        model.density_estimator.appearance_estimator.memory_bank = appearance_estimator_memory_bank    model.density_estimator.fit()    # Sposta l'intero modello sulla periferica    model = model.to(device)    # Ottieni le trasformazioni    transform_config = config.dataset.transform_config.eval if "transform_config" in config.dataset.keys() else None    image_size = (config.dataset.image_size[0], config.dataset.image_size[1])    center_crop = config.dataset.get("center_crop")    center_crop = tuple(center_crop) if center_crop is not None else None    normalization = InputNormalizationMethod(config.dataset.normalization)    transform = get_transforms(config=transform_config, image_size=image_size, center_crop=center_crop, normalization=normalization)    return model, transformdef input_fn(request_body, request_content_type):    """    La request_body viene passata da SageMaker e il tipo di contenuto viene passato     tramite un'intestazione HTTP dal client (o chiamante).    """    print("input_fn-----------------------")    if request_content_type in ("application/x-image", "image/x-image"):        image = Image.open(io.BytesIO(request_body)).convert("RGB")        numpy_array = np.array(image)        print("numpy_array.shape", numpy_array.shape)        print("input_fn-----------------------")        return [numpy_array], PredictMode.frame    elif request_content_type == "application/json":        request_body_json = json.loads(request_body)        s3_uris = request_body_json.get("images", [])        if len(s3_uris) == 0:            raise ValueError(f"Images è un campo obbligatorio e dovrebbe contenere almeno un URI S3 nella lista")        s3 = boto3.client("s3")        frame_paths = []        for s3_uri in s3_uris:            parsed_url = urlsplit(s3_uri)            bucket_name = parsed_url.netloc            object_key = parsed_url.path.lstrip('/')            local_frame_path = f"/tmp/{s3_uri.replace('/', '_')}"            # Scarica il frame da S3            s3.download_file(bucket_name, object_key, local_frame_path)            frame_paths.append(local_frame_path)        frames = np.stack([torch.Tensor(read_image(frame_path)) for frame_path in frame_paths], axis=0)                predict_mode = PredictMode.clip if request_body_json.get("clip", False) else PredictMode.batch                print("frames.shape", frames.shape)        print("predict_mode", predict_mode)        print("input_fn-----------------------")        return frames, predict_mode    # Se il tipo di contenuto non è quello previsto, genera un'eccezione    raise ValueError(f"Il tipo di contenuto {request_content_type} non è supportato")def predict_fn(input_data, model):    """    Questa funzione prende in input i dati di input e il modello restituito dalla model_fn    Viene eseguita dopo la model_fn e il suo output viene restituito come risposta API.    """    print("predict_fn-----------------------")    model, transform = model        frames, predict_mode = input_data    processed_data = {}    processed_data["image"] = [transform(image=frame)["image"] for frame in frames]    processed_data["image"] = torch.stack(processed_data["image"])    image = processed_data["image"].to(device)        # Aggiungi un'altra dimensione per una dimensione di batch di un clip    if predict_mode == PredictMode.clip:        image = image.unsqueeze(0)    print("image.shape", image.shape)    model.eval()    with torch.no_grad():        boxes, anomaly_scores, image_scores = model(image)    print("boxes_len", [len(b) for b in boxes])    processed_data["pred_boxes"] = [box.int() for box in boxes]    processed_data["box_scores"] = [score.to(device) for score in anomaly_scores]    processed_data["pred_scores"] = torch.Tensor(image_scores).to(device)    print("predict_fn-----------------------")    return processed_datadef output_fn(prediction, accept):    """    Funzione di post-elaborazione per le previsioni del modello. Viene eseguita dopo predict_fn.    """    print("output_fn-----------------------")    # Controlla se il tipo di accettazione è JSON    if accept != "application/json":        raise ValueError(f"Il tipo di accettazione {accept} non è supportato")    # Converti i tensori PyTorch in liste in modo che possano essere serializzati in JSON    for key in prediction:        # Se è un torch.Tensor, convertilo in lista        if isinstance(prediction[key], torch.Tensor):            prediction[key] = prediction[key].tolist()        # Se è una lista, converti ogni tensore nella lista        elif isinstance(prediction[key], list):            prediction[key] = [tensor.tolist() if isinstance(tensor, torch.Tensor) else tensor for tensor in prediction[key]]    print("output_fn-----------------------")    return json.dumps(prediction), accept

P.S. È fortemente consigliato testare lo script di servizio del modello prima di passare al passaggio successivo. Ciò può essere facilmente fatto simulando la pipeline di invocazione come mostrato nel codice sottostante.

import json
from inference import model_fn, predict_fn, input_fn, output_fn

response, accept = output_fn(
    predict_fn(
        input_fn(payload, "application/x-image"),
        model_fn("../")
    ),
    "application/json"
)

json.loads(response).keys()

Carica il modello su S3

Per creare un endpoint SageMaker che carichi il modello AI VAD PyTorch nello stesso stato esatto, sono necessari i seguenti file:

  • Pesi del modello AI VAD PyTorch (aka state_dict)
  • Banche di memoria dell’estimatore di densità (che non fanno parte dei pesi del modello)
  • Un file di configurazione con gli iperparametri del modello PyTorch
  • Uno script di servizio del modello Sagemaker (inference.py)

Il codice di seguito illustra come organizzare tutti i file richiesti in una singola directory.

P.S., Ho sovrascritto il callback ModelCheckpoint integrato di PyTorch per garantire che queste banche di memoria vengano salvate come parte del salvataggio del checkpoint (l’implementazione può essere trovata qui).

import torch
import joblib
import shutil

checkpoint = "results/ai_vad/ucsd/run/weights/lightning/model.ckpt"
config_path = "results/ai_vad/ucsd/run/config.yaml"

model_weights = torch.load(checkpoint)
model_state_dict = model_weights["state_dict"]
torch.save(model_state_dict, "../ai_vad_weights.pth")

velocity_estimator_memory_bank = None
pose_estimator_memory_bank = None
appearance_estimator_memory_bank = None

if "velocity_estimator_memory_bank" in model_weights:
    velocity_estimator_memory_bank = model_weights["velocity_estimator_memory_bank"]

if "pose_estimator_memory_bank" in model_weights:
    pose_estimator_memory_bank = model_weights["pose_estimator_memory_bank"]

if "appearance_estimator_memory_bank" in model_weights:
    appearance_estimator_memory_bank = model_weights["appearance_estimator_memory_bank"]

banks = (velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank)
joblib.dump(banks, "../ai_vad_banks.joblib")

shutil.copyfile(config_path, "../ai_vad_config.yaml")

Successivamente, i quattro file sono stati compressi insieme per creare il file tar.gz utilizzando il comando seguente.

tar -czvf ../ai_vad_model.tar.gz -C ../ ai_vad_weights.pth ai_vad_banks.joblib ai_vad_config.yaml inference.py

Infine, il file è stato caricato su S3 utilizzando boto3.

import boto3
from datetime import datetime

current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

s3 = boto3.resource('s3')
s3.meta.client.upload_file("../ai_vad_model.tar.gz", "ai-vad", f"{current_datetime}/ai_vad_model.tar.gz")

Carica un’immagine Docker personalizzata su AWS ECR

Come accennato in precedenza, poiché abbiamo una dipendenza aggiuntiva che non è inclusa nell’immagine predefinita di PyTorch Sagemaker (cioè il pacchetto anomalib), abbiamo creato una nuova immagine Docker a tale scopo. Prima di creare l’immagine Docker personalizzata, è necessaria l’autenticazione al repository Amazon ECR.

REGION=<my_aws_region>
ACCOUNT=<my_aws_account>

# Autentica Docker su un registro Amazon ECR
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin <docker_registry_url>.dkr.ecr.$REGION.amazonaws.com

# Accedi al tuo registro Amazon ECR privato
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.$REGION.amazonaws.com

Il Dockerfile può essere trovato di seguito e i diversi percorsi del registro Docker possono essere trovati qui. Assicurati di selezionare il percorso corretto del registro in base alle esigenze del modello (CPU/GPU, versione di Python, ecc.) e alla tua regione AWS. Ad esempio, se la regione è us-east-1, il percorso completo del registro Docker dovrebbe assomigliare a questo:763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310

# Utilizza l'immagine PyTorch di SageMaker come immagine di base
FROM <docker_registry_url>.dkr.ecr.<my_aws_region>.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# Installa la dipendenza aggiuntiva
RUN pip install "git+https://github.com/hairozen/anomalib.git@ai-vad-inference-improvements"

Ora possiamo eseguire il comando classico di build di Docker per costruire questa immagine personalizzata.

docker build -t ai-vad-image .

Il passaggio successivo è creare il repository AWS ECR per la nuova immagine che abbiamo costruito, associarla ad un tag e caricare l’immagine nel repository AWS ECR.

# Crea il repository AWS ECR
aws ecr create-repository --repository-name ai-vad-image
# Associa il tag all'immagine
docker tag ai-vad-image:latest $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
# Carica l'immagine associata nel repository AWS ECR
docker push $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest

Crea un Modello in SageMaker

Questo passaggio è piuttosto semplice. Ecco il codice.

import boto3
import sagemaker

sagemaker_client = boto3.client(service_name="sagemaker")
role = sagemaker.get_execution_role()
model_name = f"ai-vad-model-{current_datetime}"
primary_container = {
    "Image": f"{my_aws_account}.dkr.ecr.{my_aws_region}.amazonaws.com/ai-vad-image:latest",
    "ModelDataUrl": f"s3://ai-vad/{current_datetime}/ai_vad_model.tar.gz"
}
create_model_response = sagemaker_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer=primary_container)

Crea una Configurazione Endpoint

Il passaggio successivo è creare una configurazione endpoint. Di seguito puoi trovare un esempio base.

endpoint_config_name = f"ai-vad-model-config-{current_datetime}"
sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": "ml.g5.xlarge",
            "InitialVariantWeight": 1,
            "InitialInstanceCount": 1,
            "ModelName": model_name,
            "VariantName": "AllTraffic"
        }
    ]
)

Crea un Endpoint

Ora siamo pronti per creare l’endpoint stesso.

endpoint_name = f"ai-vad-model-endpoint-{current_datetime}"
sagemaker_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name
)

Nota che potrebbero essere necessari alcuni minuti affinché lo stato dell’endpoint passi da “Creazione” a “InServizio”. Lo stato attuale può essere verificato come mostrato di seguito.

response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response["EndpointStatus"]

Esegui l’Endpoint

È arrivato il momento della verità. Ora è il momento di invocare l’endpoint per testare se tutto funziona come previsto.

with open(file_name, "rb") as f:
    payload = f.read()
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
predictor.serializer = DataSerializer(content_type="image/x-image")
predictor.predict(payload)

Quindi, questo è un bel controllo, ma devi tenere presente che la funzione predictor.predict non esegue l’intera pipeline di invocazione dello script di servizio SageMaker, che include:output_fn(predict_fn(input_fn(input_data, model_fn(model_dir)),accept)

Per testare anche questo, invochiamo il modello usando una chiamata API.

with open(file_name, "rb") as f:
    payload = f.read()
sagemaker_runtime = boto3.client("runtime.sagemaker")
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="image/x-image",
    Body=payload
)
response = json.loads(response["Body"].read().decode())

Utilizzando la fantastica visualizzazione fornita da anomalib, possiamo disegnare i risultati per il frame corrispondente al dataset UCSDped2.

Immagine dell'autore. L'immagine è stata generata utilizzando il pacchetto anomalib basato sul UCSD Anomaly Detection Dataset. I riquadri verdi indicano che non ci sono anomalie nel modo in cui camminano i pedoni, mentre il riquadro rosso, per il ciclista, indica un'anomalia probabilmente legata alla velocità e alla posizione del modello AI VAD.

Conclusioni

Ok, facciamo un breve riassunto di quello che abbiamo trattato qui. La distribuzione di un modello SageMaker per la fornitura richiede una serie di passaggi.

Innanzitutto, lo script di servizio del modello SageMaker deve essere scritto per definire la funzionalità e il comportamento del modello.

Il modello viene quindi caricato su Amazon S3 per l’archiviazione e il recupero. Inoltre, un’immagine Docker personalizzata viene caricata nell’AWS Elastic Container Registry (ECR) per contenere il modello e le sue dipendenze. Il passaggio successivo prevede la creazione di un modello in SageMaker, che associa gli artefatti del modello archiviati in S3 con l’immagine Docker archiviata in ECR.

Successivamente viene creata una configurazione dell’endpoint, che definisce il numero e il tipo di istanze da utilizzare per l’hosting del modello.

Infine, viene creato un endpoint per stabilire una connessione live tra il modello distribuito e le applicazioni client, consentendo loro di invocare l’endpoint e effettuare previsioni in tempo reale.

Attraverso questi passaggi, la distribuzione di un modello SageMaker diventa un processo streamlinato che garantisce una fornitura efficiente e affidabile del modello.

Appendice

Il paper “Attributo basato su rappresentazioni per la rilevazione accurata e interpretabile delle anomalie nei video” pubblicato nel 2023 da Reiss et al. propone un metodo semplice ma altamente efficace per la rilevazione delle anomalie nei video (VAD) utilizzando rappresentazioni basate sugli attributi.

Il documento sostiene che i metodi tradizionali VAD, che spesso si basano sull’apprendimento profondo, sono spesso difficili da interpretare, rendendo difficile per gli utenti capire perché il sistema segnali determinati fotogrammi o oggetti come anomali.

Per affrontare questa problematica, gli autori propongono un metodo che rappresenta ogni oggetto in un video in base alla sua velocità, posa e profondità. Questi attributi sono facili da comprendere e interpretare e possono essere utilizzati per calcolare i punteggi di anomalia attraverso un approccio basato sulla densità.

Il paper mostra che questa semplice rappresentazione è sufficiente per ottenere prestazioni all’avanguardia su diversi complessi dataset VAD, tra cui ShanghaiTech, il dataset VAD più grande e complesso.

Oltre ad essere accurato, gli autori dimostrano anche che il loro metodo è interpretabile. Ad esempio, possono fornire agli utenti un elenco degli oggetti in un video che contribuiscono di più al punteggio di anomalia, insieme alle relative informazioni sulla velocità, posa e profondità. Questo può aiutare gli utenti a capire perché il sistema segnala il video come anomalo.

In generale, questo paper rappresenta un importante contributo nel campo del VAD. Propone un metodo semplice, accurato e interpretabile per il VAD che può essere utilizzato in una varietà di applicazioni.