Implementare un modello di machine learning personalizzato come endpoint di SageMaker
Implementazione di un modello di machine learning personalizzato come endpoint di SageMaker
Deployment dell’Endpoint SageMaker
Una guida rapida e semplice per creare un endpoint SageMaker AWS per il tuo modello
Sviluppare un modello di machine learning (ML) comporta passaggi chiave, dalla raccolta dei dati alla distribuzione del modello. Dopo aver affinato gli algoritmi e garantito le prestazioni attraverso i test, l’ultimo passaggio cruciale è la distribuzione. Questa fase trasforma l’innovazione in utilità, consentendo agli altri di beneficiare delle capacità predictive del modello. Il modello di ML distribuito colma il divario tra lo sviluppo e l’impatto nel mondo reale, fornendo vantaggi tangibili agli utenti e agli stakeholder.
Questa guida copre i passaggi di base necessari per sviluppare un endpoint SageMaker ML personalizzato. A questo punto, si presume che tu abbia già un modello funzionante e desideri esporlo al resto del mondo tramite un endpoint. La guida ti aiuterà a distribuire un modello basato su PyTorch che mira a prevedere anomalie in video clip. Il modello, anche conosciuto come AI VAD, si basa sul paper “Attribute-based Representations for Accurate and Interpretable Video Anomaly Detection”, e la sua implementazione può essere trovata nel repository GitHub di anomalib di OpenVINO. Per saperne di più su questo interessante approccio, scorri fino alla fine di questo articolo nella sezione Appendice.
A questo punto, desidero sottolineare che in questo caso non possiamo utilizzare l’astrazione PyTorchModel specificamente creata per distribuire modelli PyTorch per due motivi. Il primo motivo è che abbiamo il pacchetto anomalib come dipendenza aggiuntiva che non è inclusa nell’immagine pre-costruita di PyTorch Sagemaker. Il secondo motivo è che il modello richiede informazioni aggiuntive apprese durante la fase di addestramento che non fanno parte dei pesi del modello PyTorch.
Ecco i passaggi per raggiungere questo obiettivo:
- Scienza dei dati nell’intrattenimento streaming vs cinema
- Quanto affidabile è un rapporto?
- Qual è la probabilità che due persone abbiano le stesse iniziali?
- Scrivi lo script di servizio del modello SageMaker
- Carica il modello su S3
- Carica un’immagine Docker personalizzata in AWS ECR
- Crea un Modello in SageMaker
- Crea una Configurazione dell’Endpoint
- Crea un Endpoint
- Invocare l’Endpoint
Scrivi lo script di servizio del modello SageMaker
Lo script di servizio del modello SageMaker (inference.py
) è un componente importante nella creazione di un modello SageMaker. Funge da ponte tra i modelli di machine learning e i dati del mondo reale. Essenzialmente, elabora le richieste in arrivo, esegue le previsioni del modello e restituisce i risultati. Influenza quindi il processo decisionale di un’applicazione.
Lo script inference.py
è composto da diversi metodi chiave, ognuno con uno scopo unico, che facilita collettivamente il processo di servizio del modello. Di seguito elenco i quattro principali.
- Il metodo
model_fn
è responsabile del caricamento del modello addestrato. Legge gli artefatti del modello che sono stati salvati e restituisce un oggetto modello che può essere utilizzato per le previsioni. Questo metodo viene chiamato solo una volta quando il server del modello SageMaker viene avviato. - Il metodo
input_fn
prende i dati della richiesta e li formatta in una forma adatta per effettuare previsioni. Ad esempio, nel codice sottostante questa funzione formatta i dati in modo diverso in base all’origine dei dati (byte dell’immagine o elenco di URI S3) e se l’elenco di frame deve essere considerato come una singola clip video. - Il metodo
predict_fn
prende i dati della richiesta formattati e esegue l’inférence sul modello caricato. - Infine, viene utilizzato il metodo
output_fn
. Prende il risultato della previsione e lo formatta in un messaggio di risposta. Ad esempio, lo incapsula in un oggetto JSON.
Di seguito puoi trovare il codice per lo script di servizio del modello Sagemaker.
import osimport jsonimport joblibimport torchfrom PIL import Imageimport numpy as npimport ioimport boto3from enum import Enumfrom urllib.parse import urlsplitfrom omegaconf import OmegaConffrom anomalib.data.utils import read_image, InputNormalizationMethod, get_transformsfrom anomalib.models.ai_vad.torch_model import AiVadModeldevice = "cuda"class PredictMode(Enum): frame = 1 batch = 2 clip = 3 def model_fn(model_dir): """ Questa funzione è la prima ad essere eseguita al momento di una richiesta di previsione, carica il modello dal disco e restituisce l'oggetto modello che verrà successivamente utilizzato per l'elaborazione. """ # Carica il file di configurazione config = OmegaConf.load(os.path.join(model_dir, "ai_vad_config.yaml")) config_model = config.model # Carica il modello model = AiVadModel( box_score_thresh=config_model.box_score_thresh, persons_only=config_model.persons_only, min_bbox_area=config_model.min_bbox_area, max_bbox_overlap=config_model.max_bbox_overlap, enable_foreground_detections=config_model.enable_foreground_detections, foreground_kernel_size=config_model.foreground_kernel_size, foreground_binary_threshold=config_model.foreground_binary_threshold, n_velocity_bins=config_model.n_velocity_bins, use_velocity_features=config_model.use_velocity_features, use_pose_features=config_model.use_pose_features, use_deep_features=config_model.use_deep_features, n_components_velocity=config_model.n_components_velocity, n_neighbors_pose=config_model.n_neighbors_pose, n_neighbors_deep=config_model.n_neighbors_deep, ) # Carica i pesi del modello model.load_state_dict(torch.load(os.path.join(model_dir, "ai_vad_weights.pth"), map_location=device), strict=False) # Carica le memory banks velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank = joblib.load(os.path.join(model_dir, "ai_vad_banks.joblib")) if velocity_estimator_memory_bank is not None: model.density_estimator.velocity_estimator.memory_bank = velocity_estimator_memory_bank if pose_estimator_memory_bank is not None: model.density_estimator.pose_estimator.memory_bank = pose_estimator_memory_bank if appearance_estimator_memory_bank is not None: model.density_estimator.appearance_estimator.memory_bank = appearance_estimator_memory_bank model.density_estimator.fit() # Sposta l'intero modello sulla periferica model = model.to(device) # Ottieni le trasformazioni transform_config = config.dataset.transform_config.eval if "transform_config" in config.dataset.keys() else None image_size = (config.dataset.image_size[0], config.dataset.image_size[1]) center_crop = config.dataset.get("center_crop") center_crop = tuple(center_crop) if center_crop is not None else None normalization = InputNormalizationMethod(config.dataset.normalization) transform = get_transforms(config=transform_config, image_size=image_size, center_crop=center_crop, normalization=normalization) return model, transformdef input_fn(request_body, request_content_type): """ La request_body viene passata da SageMaker e il tipo di contenuto viene passato tramite un'intestazione HTTP dal client (o chiamante). """ print("input_fn-----------------------") if request_content_type in ("application/x-image", "image/x-image"): image = Image.open(io.BytesIO(request_body)).convert("RGB") numpy_array = np.array(image) print("numpy_array.shape", numpy_array.shape) print("input_fn-----------------------") return [numpy_array], PredictMode.frame elif request_content_type == "application/json": request_body_json = json.loads(request_body) s3_uris = request_body_json.get("images", []) if len(s3_uris) == 0: raise ValueError(f"Images è un campo obbligatorio e dovrebbe contenere almeno un URI S3 nella lista") s3 = boto3.client("s3") frame_paths = [] for s3_uri in s3_uris: parsed_url = urlsplit(s3_uri) bucket_name = parsed_url.netloc object_key = parsed_url.path.lstrip('/') local_frame_path = f"/tmp/{s3_uri.replace('/', '_')}" # Scarica il frame da S3 s3.download_file(bucket_name, object_key, local_frame_path) frame_paths.append(local_frame_path) frames = np.stack([torch.Tensor(read_image(frame_path)) for frame_path in frame_paths], axis=0) predict_mode = PredictMode.clip if request_body_json.get("clip", False) else PredictMode.batch print("frames.shape", frames.shape) print("predict_mode", predict_mode) print("input_fn-----------------------") return frames, predict_mode # Se il tipo di contenuto non è quello previsto, genera un'eccezione raise ValueError(f"Il tipo di contenuto {request_content_type} non è supportato")def predict_fn(input_data, model): """ Questa funzione prende in input i dati di input e il modello restituito dalla model_fn Viene eseguita dopo la model_fn e il suo output viene restituito come risposta API. """ print("predict_fn-----------------------") model, transform = model frames, predict_mode = input_data processed_data = {} processed_data["image"] = [transform(image=frame)["image"] for frame in frames] processed_data["image"] = torch.stack(processed_data["image"]) image = processed_data["image"].to(device) # Aggiungi un'altra dimensione per una dimensione di batch di un clip if predict_mode == PredictMode.clip: image = image.unsqueeze(0) print("image.shape", image.shape) model.eval() with torch.no_grad(): boxes, anomaly_scores, image_scores = model(image) print("boxes_len", [len(b) for b in boxes]) processed_data["pred_boxes"] = [box.int() for box in boxes] processed_data["box_scores"] = [score.to(device) for score in anomaly_scores] processed_data["pred_scores"] = torch.Tensor(image_scores).to(device) print("predict_fn-----------------------") return processed_datadef output_fn(prediction, accept): """ Funzione di post-elaborazione per le previsioni del modello. Viene eseguita dopo predict_fn. """ print("output_fn-----------------------") # Controlla se il tipo di accettazione è JSON if accept != "application/json": raise ValueError(f"Il tipo di accettazione {accept} non è supportato") # Converti i tensori PyTorch in liste in modo che possano essere serializzati in JSON for key in prediction: # Se è un torch.Tensor, convertilo in lista if isinstance(prediction[key], torch.Tensor): prediction[key] = prediction[key].tolist() # Se è una lista, converti ogni tensore nella lista elif isinstance(prediction[key], list): prediction[key] = [tensor.tolist() if isinstance(tensor, torch.Tensor) else tensor for tensor in prediction[key]] print("output_fn-----------------------") return json.dumps(prediction), accept
P.S. È fortemente consigliato testare lo script di servizio del modello prima di passare al passaggio successivo. Ciò può essere facilmente fatto simulando la pipeline di invocazione come mostrato nel codice sottostante.
import json
from inference import model_fn, predict_fn, input_fn, output_fn
response, accept = output_fn(
predict_fn(
input_fn(payload, "application/x-image"),
model_fn("../")
),
"application/json"
)
json.loads(response).keys()
Carica il modello su S3
Per creare un endpoint SageMaker che carichi il modello AI VAD PyTorch nello stesso stato esatto, sono necessari i seguenti file:
- Pesi del modello AI VAD PyTorch (aka state_dict)
- Banche di memoria dell’estimatore di densità (che non fanno parte dei pesi del modello)
- Un file di configurazione con gli iperparametri del modello PyTorch
- Uno script di servizio del modello Sagemaker (
inference.py
)
Il codice di seguito illustra come organizzare tutti i file richiesti in una singola directory.
P.S., Ho sovrascritto il callback ModelCheckpoint integrato di PyTorch per garantire che queste banche di memoria vengano salvate come parte del salvataggio del checkpoint (l’implementazione può essere trovata qui).
import torch
import joblib
import shutil
checkpoint = "results/ai_vad/ucsd/run/weights/lightning/model.ckpt"
config_path = "results/ai_vad/ucsd/run/config.yaml"
model_weights = torch.load(checkpoint)
model_state_dict = model_weights["state_dict"]
torch.save(model_state_dict, "../ai_vad_weights.pth")
velocity_estimator_memory_bank = None
pose_estimator_memory_bank = None
appearance_estimator_memory_bank = None
if "velocity_estimator_memory_bank" in model_weights:
velocity_estimator_memory_bank = model_weights["velocity_estimator_memory_bank"]
if "pose_estimator_memory_bank" in model_weights:
pose_estimator_memory_bank = model_weights["pose_estimator_memory_bank"]
if "appearance_estimator_memory_bank" in model_weights:
appearance_estimator_memory_bank = model_weights["appearance_estimator_memory_bank"]
banks = (velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank)
joblib.dump(banks, "../ai_vad_banks.joblib")
shutil.copyfile(config_path, "../ai_vad_config.yaml")
Successivamente, i quattro file sono stati compressi insieme per creare il file tar.gz
utilizzando il comando seguente.
tar -czvf ../ai_vad_model.tar.gz -C ../ ai_vad_weights.pth ai_vad_banks.joblib ai_vad_config.yaml inference.py
Infine, il file è stato caricato su S3 utilizzando boto3.
import boto3
from datetime import datetime
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
s3 = boto3.resource('s3')
s3.meta.client.upload_file("../ai_vad_model.tar.gz", "ai-vad", f"{current_datetime}/ai_vad_model.tar.gz")
Carica un’immagine Docker personalizzata su AWS ECR
Come accennato in precedenza, poiché abbiamo una dipendenza aggiuntiva che non è inclusa nell’immagine predefinita di PyTorch Sagemaker (cioè il pacchetto anomalib), abbiamo creato una nuova immagine Docker a tale scopo. Prima di creare l’immagine Docker personalizzata, è necessaria l’autenticazione al repository Amazon ECR.
REGION=<my_aws_region>
ACCOUNT=<my_aws_account>
# Autentica Docker su un registro Amazon ECR
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin <docker_registry_url>.dkr.ecr.$REGION.amazonaws.com
# Accedi al tuo registro Amazon ECR privato
aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.$REGION.amazonaws.com
Il Dockerfile può essere trovato di seguito e i diversi percorsi del registro Docker possono essere trovati qui. Assicurati di selezionare il percorso corretto del registro in base alle esigenze del modello (CPU/GPU, versione di Python, ecc.) e alla tua regione AWS. Ad esempio, se la regione è us-east-1
, il percorso completo del registro Docker dovrebbe assomigliare a questo:763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# Utilizza l'immagine PyTorch di SageMaker come immagine di base
FROM <docker_registry_url>.dkr.ecr.<my_aws_region>.amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# Installa la dipendenza aggiuntiva
RUN pip install "git+https://github.com/hairozen/anomalib.git@ai-vad-inference-improvements"
Ora possiamo eseguire il comando classico di build di Docker per costruire questa immagine personalizzata.
docker build -t ai-vad-image .
Il passaggio successivo è creare il repository AWS ECR per la nuova immagine che abbiamo costruito, associarla ad un tag e caricare l’immagine nel repository AWS ECR.
# Crea il repository AWS ECR
aws ecr create-repository --repository-name ai-vad-image
# Associa il tag all'immagine
docker tag ai-vad-image:latest $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
# Carica l'immagine associata nel repository AWS ECR
docker push $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
Crea un Modello in SageMaker
Questo passaggio è piuttosto semplice. Ecco il codice.
import boto3
import sagemaker
sagemaker_client = boto3.client(service_name="sagemaker")
role = sagemaker.get_execution_role()
model_name = f"ai-vad-model-{current_datetime}"
primary_container = {
"Image": f"{my_aws_account}.dkr.ecr.{my_aws_region}.amazonaws.com/ai-vad-image:latest",
"ModelDataUrl": f"s3://ai-vad/{current_datetime}/ai_vad_model.tar.gz"
}
create_model_response = sagemaker_client.create_model(
ModelName=model_name,
ExecutionRoleArn=role,
PrimaryContainer=primary_container)
Crea una Configurazione Endpoint
Il passaggio successivo è creare una configurazione endpoint. Di seguito puoi trovare un esempio base.
endpoint_config_name = f"ai-vad-model-config-{current_datetime}"
sagemaker_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
"InstanceType": "ml.g5.xlarge",
"InitialVariantWeight": 1,
"InitialInstanceCount": 1,
"ModelName": model_name,
"VariantName": "AllTraffic"
}
]
)
Crea un Endpoint
Ora siamo pronti per creare l’endpoint stesso.
endpoint_name = f"ai-vad-model-endpoint-{current_datetime}"
sagemaker_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name
)
Nota che potrebbero essere necessari alcuni minuti affinché lo stato dell’endpoint passi da “Creazione” a “InServizio”. Lo stato attuale può essere verificato come mostrato di seguito.
response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response["EndpointStatus"]
Esegui l’Endpoint
È arrivato il momento della verità. Ora è il momento di invocare l’endpoint per testare se tutto funziona come previsto.
with open(file_name, "rb") as f:
payload = f.read()
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
predictor.serializer = DataSerializer(content_type="image/x-image")
predictor.predict(payload)
Quindi, questo è un bel controllo, ma devi tenere presente che la funzione predictor.predict
non esegue l’intera pipeline di invocazione dello script di servizio SageMaker, che include:output_fn(predict_fn(input_fn(input_data, model_fn(model_dir)),accept)
Per testare anche questo, invochiamo il modello usando una chiamata API.
with open(file_name, "rb") as f:
payload = f.read()
sagemaker_runtime = boto3.client("runtime.sagemaker")
response = sagemaker_runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType="image/x-image",
Body=payload
)
response = json.loads(response["Body"].read().decode())
Utilizzando la fantastica visualizzazione fornita da anomalib, possiamo disegnare i risultati per il frame corrispondente al dataset UCSDped2.
Conclusioni
Ok, facciamo un breve riassunto di quello che abbiamo trattato qui. La distribuzione di un modello SageMaker per la fornitura richiede una serie di passaggi.
Innanzitutto, lo script di servizio del modello SageMaker deve essere scritto per definire la funzionalità e il comportamento del modello.
Il modello viene quindi caricato su Amazon S3 per l’archiviazione e il recupero. Inoltre, un’immagine Docker personalizzata viene caricata nell’AWS Elastic Container Registry (ECR) per contenere il modello e le sue dipendenze. Il passaggio successivo prevede la creazione di un modello in SageMaker, che associa gli artefatti del modello archiviati in S3 con l’immagine Docker archiviata in ECR.
Successivamente viene creata una configurazione dell’endpoint, che definisce il numero e il tipo di istanze da utilizzare per l’hosting del modello.
Infine, viene creato un endpoint per stabilire una connessione live tra il modello distribuito e le applicazioni client, consentendo loro di invocare l’endpoint e effettuare previsioni in tempo reale.
Attraverso questi passaggi, la distribuzione di un modello SageMaker diventa un processo streamlinato che garantisce una fornitura efficiente e affidabile del modello.
Appendice
Il paper “Attributo basato su rappresentazioni per la rilevazione accurata e interpretabile delle anomalie nei video” pubblicato nel 2023 da Reiss et al. propone un metodo semplice ma altamente efficace per la rilevazione delle anomalie nei video (VAD) utilizzando rappresentazioni basate sugli attributi.
Il documento sostiene che i metodi tradizionali VAD, che spesso si basano sull’apprendimento profondo, sono spesso difficili da interpretare, rendendo difficile per gli utenti capire perché il sistema segnali determinati fotogrammi o oggetti come anomali.
Per affrontare questa problematica, gli autori propongono un metodo che rappresenta ogni oggetto in un video in base alla sua velocità, posa e profondità. Questi attributi sono facili da comprendere e interpretare e possono essere utilizzati per calcolare i punteggi di anomalia attraverso un approccio basato sulla densità.
Il paper mostra che questa semplice rappresentazione è sufficiente per ottenere prestazioni all’avanguardia su diversi complessi dataset VAD, tra cui ShanghaiTech, il dataset VAD più grande e complesso.
Oltre ad essere accurato, gli autori dimostrano anche che il loro metodo è interpretabile. Ad esempio, possono fornire agli utenti un elenco degli oggetti in un video che contribuiscono di più al punteggio di anomalia, insieme alle relative informazioni sulla velocità, posa e profondità. Questo può aiutare gli utenti a capire perché il sistema segnala il video come anomalo.
In generale, questo paper rappresenta un importante contributo nel campo del VAD. Propone un metodo semplice, accurato e interpretabile per il VAD che può essere utilizzato in una varietà di applicazioni.