Orchestrare flussi di lavoro di machine learning basati su Ray utilizzando Amazon SageMaker

Orchestrate machine learning workflows using Ray on Amazon SageMaker

L’apprendimento automatico (ML) sta diventando sempre più complesso man mano che i clienti cercano di risolvere problemi sempre più sfidanti. Questa complessità spesso porta alla necessità di un ML distribuito, in cui vengono utilizzate più macchine per addestrare un singolo modello. Sebbene ciò consenta la parallelizzazione delle attività su più nodi, riducendo i tempi di addestramento, migliorando la scalabilità e le prestazioni, ci sono sfide significative nell’utilizzo efficace dell’hardware distribuito. Gli scienziati dei dati devono affrontare sfide come la suddivisione dei dati, il bilanciamento del carico, la tolleranza ai guasti e la scalabilità. Gli ingegneri di ML devono gestire parallelizzazione, pianificazione, guasti e ritentativi manualmente, richiedendo codice di infrastruttura complesso.

In questo post, discutiamo i vantaggi dell’utilizzo di Ray e Amazon SageMaker per l’ML distribuito e forniamo una guida passo passo su come utilizzare questi framework per creare e distribuire un flusso di lavoro ML scalabile.

Ray, un framework di calcolo distribuito open-source, fornisce un framework flessibile per l’addestramento e il servizio distribuito di modelli di ML. Astrae i dettagli del sistema distribuito a basso livello attraverso librerie semplici e scalabili per compiti di ML comuni come la preelaborazione dei dati, l’addestramento distribuito, il tuning degli iperparametri, il reinforcement learning e il servizio dei modelli.

SageMaker è un servizio completamente gestito per la creazione, l’addestramento e il rilascio di modelli di ML. Ray si integra perfettamente con le funzionalità di SageMaker per creare e distribuire carichi di lavoro di ML complessi ed efficienti. La combinazione di Ray e SageMaker fornisce funzionalità end-to-end per flussi di lavoro ML scalabili e ha le seguenti caratteristiche principali:

  • Gli attori e le costruzioni di parallelismo distribuito in Ray semplificano lo sviluppo di applicazioni distribuite.
  • Ray AI Runtime (AIR) riduce l’attrito nel passaggio dallo sviluppo alla produzione. Con Ray e AIR, lo stesso codice Python può scalare senza problemi da un laptop a un cluster di grandi dimensioni.
  • Le infrastrutture gestite di SageMaker e le funzionalità come i job di elaborazione, i job di addestramento e i job di tuning degli iperparametri possono utilizzare le librerie di Ray per il calcolo distribuito.
  • Amazon SageMaker Experiments consente di iterare rapidamente e tenere traccia dei tentativi.
  • Amazon SageMaker Feature Store fornisce un repository scalabile per archiviare, recuperare e condividere le caratteristiche di ML per l’addestramento dei modelli.
  • I modelli addestrati possono essere archiviati, versionati e monitorati in Amazon SageMaker Model Registry per la governance e la gestione.
  • Amazon SageMaker Pipelines consente di orchestrare l’intero ciclo di vita di ML dalla preparazione dei dati all’addestramento fino al rilascio del modello come flussi di lavoro automatizzati.

Panoramica della soluzione

In questo post ci concentriamo sui vantaggi dell’utilizzo di Ray e SageMaker insieme. Configuriamo un flusso di lavoro ML basato su Ray, orchestrato utilizzando SageMaker Pipelines. Il flusso di lavoro include l’ingestione parallela dei dati nel feature store utilizzando gli attori Ray, la preelaborazione dei dati con Ray Data, l’addestramento dei modelli e l’ottimizzazione degli iperparametri su larga scala utilizzando Ray Train e i job di ottimizzazione degli iperparametri (HPO), e infine la valutazione del modello e la registrazione del modello in un registro dei modelli.

Per i nostri dati, utilizziamo un dataset sintetico sugli immobili che consiste in otto caratteristiche (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH e DECK) e il nostro modello predirà il PRICE della casa.

Ogni fase del flusso di lavoro di ML è suddivisa in passi discreti, con il proprio script che prende parametri di input e output. Nella sezione successiva, evidenziamo frammenti di codice chiave da ciascun passo. Il codice completo può essere trovato sul repository GitHub aws-samples-for-ray.

Prerequisiti

Per utilizzare il SDK Python di SageMaker e eseguire il codice associato a questo post, sono necessari i seguenti prerequisiti:

  • Un account AWS che contiene tutte le risorse di AWS
  • Un ruolo di Identity and Access Management (IAM) con accesso a notebook di Amazon SageMaker Studio, al feature store di SageMaker, al registro dei modelli di SageMaker e alle pipeline di SageMaker

Inserire i dati nel feature store di SageMaker

Il primo passo nel flusso di lavoro di ML è leggere il file di dati di origine da Amazon Simple Storage Service (Amazon S3) in formato CSV e inserirlo nel feature store di SageMaker. Il feature store di SageMaker è un repository appositamente progettato che semplifica la creazione, la condivisione e la gestione delle caratteristiche di ML. Semplifica la scoperta, il riutilizzo e la condivisione delle caratteristiche, portando a uno sviluppo più rapido, a una maggiore collaborazione tra i team dei clienti e a costi ridotti.

L’inserimento delle caratteristiche nel feature store contiene i seguenti passaggi:

  1. Definire un gruppo di caratteristiche e creare il gruppo di caratteristiche nel feature store.
  2. Preparare i dati di origine per il feature store aggiungendo un tempo di evento e un ID di record per ogni riga di dati.
  3. Inserire i dati preparati nel gruppo di caratteristiche utilizzando il Boto3 SDK.

In questa sezione, evidenziamo solo il Passaggio 3, perché è la parte che coinvolge l’elaborazione parallela del compito di ingestione utilizzando Ray. È possibile esaminare l’intero codice di questo processo nel repository GitHub.

Il metodo ingest_features è definito all’interno di una classe chiamata Featurestore. Nota che la classe Featurestore è decorata con @ray.remote. Questo indica che un’istanza di questa classe è un attore Ray, un’unità computazionale statale e concorrente all’interno di Ray. È un modello di programmazione che consente di creare oggetti distribuiti che mantengono uno stato interno e possono essere accessibili contemporaneamente da più attività in esecuzione su nodi diversi in un cluster Ray. Gli attori forniscono un modo per gestire ed incapsulare lo stato mutabile, rendendoli preziosi per la creazione di applicazioni complesse e con stato in un contesto distribuito. È anche possibile specificare i requisiti di risorse negli attori. In questo caso, ogni istanza della classe FeatureStore richiederà 0,5 CPU. Vedi il seguente codice:

@ray.remote(num_cpus=0.5)
class Featurestore:
    def ingest_features(self,feature_group_name, df, region):
        """
        Ingest features to Feature Store Group
        Args:
            feature_group_name (str): Nome del gruppo di funzionalità
            data_path (str): Percorso per il train/validation/test dei dati in formato CSV.
        """
        
        ...

È possibile interagire con l’attore chiamando l’operatore remote. Nel seguente codice, il numero desiderato di attori viene passato come argomento di input allo script. I dati vengono quindi suddivisi in base al numero di attori e passati ai processi paralleli remoti per essere inseriti nel feature store. È possibile chiamare get sull’oggetto di riferimento per bloccare l’esecuzione dell’attività corrente fino al completamento del calcolo remoto e alla disponibilità del risultato. Quando il risultato è disponibile, ray.get restituirà il risultato e l’esecuzione dell’attività corrente continuerà.

import modin.pandas as pd
import ray

df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = []

for actor, partition in zip(actors, input_partitions):
    results.append(actor.ingest_features.remote(
                        args.feature_group_name, 
                        partition, args.region
                      )
                )

ray.get(results)

Preparare i dati per l’addestramento, la convalida e il test

In questo passaggio, utilizziamo Ray Dataset per suddividere, trasformare e scalare efficacemente il nostro dataset in preparazione per l’apprendimento automatico. Ray Dataset fornisce un modo standard per caricare dati distribuiti in Ray, supportando vari sistemi di archiviazione e formati di file. Ha API per le operazioni comuni di preelaborazione dei dati ML come trasformazioni parallele, mescolamenti, raggruppamenti e aggregazioni. Ray Dataset gestisce anche operazioni che richiedono una configurazione con stato e l’accelerazione GPU. Si integra facilmente con altre librerie di elaborazione dati come Spark, Pandas, NumPy e altre, nonché con framework di apprendimento automatico come TensorFlow e PyTorch. Ciò consente di creare pipeline di dati end-to-end e flussi di lavoro di apprendimento automatico su Ray. L’obiettivo è rendere più facile il processamento dei dati distribuiti e l’apprendimento automatico per i praticanti e i ricercatori.

Analizziamo le sezioni degli script che eseguono questa preelaborazione dei dati. Iniziamo caricando i dati dal feature store:

def load_dataset(feature_group_name, region):
    """
    Carica i dati come dataset Ray dalla posizione S3 del feature store offline
    Args:
        feature_group_name (str): nome del gruppo di funzionalità
    Returns:
        ds (ray.data.dataset): dataset Ray che contiene i dati richiesti dal feature store
    """
    session = sagemaker.Session(boto3.Session(region_name=region))
    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=session
    )

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    
    # Rimuovi colonne aggiunte dal feature store
    # Poiché non sono correlate al problema di ML in questione
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.data.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} la conta è {ds.count()}")
    return ds

Quindi suddividiamo e scaliamo i dati utilizzando le astrazioni di livello superiore disponibili dalla libreria ray.data:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    """
    Dividi il dataset in campioni di allenamento, validazione e test
    Argomenti:
        dataset (ray.data.Dataset): dati di input
        train_size (float): proporzione dei dati da utilizzare come dataset di allenamento
        val_size (float): proporzione dei dati da utilizzare come dataset di validazione
        test_size (float): proporzione dei dati da utilizzare come dataset di test
        random_state (int): Passare un int per output riproducibile in chiamate di funzione multiple.
    Restituisce:
        train_set (ray.data.Dataset): dataset di allenamento
        val_set (ray.data.Dataset): dataset di validazione
        test_set (ray.data.Dataset): dataset di test
    """
    # Mescola questo dataset con un seme casuale fisso.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Suddividi i dati in dataset di allenamento, validazione e test
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    """
    Adatta StandardScaler a train_set e applicalo a val_set e test_set
    Argomenti:
        train_set (ray.data.Dataset): dataset di allenamento
        val_set (ray.data.Dataset): dataset di validazione
        test_set (ray.data.Dataset): dataset di test
        target_col (str): colonna target
    Restituisce:
        train_transformed (ray.data.Dataset): dati di allenamento scalati
        val_transformed (ray.data.Dataset): dati di validazione scalati
        test_transformed (ray.data.Dataset): dati di test scalati
    """
    tranform_cols = dataset.columns()
    # Rimuovi le colonne target da scalare
    tranform_cols.remove(target_col)
    # Imposta uno scaler standard
    standard_scaler = StandardScaler(tranform_cols)
    # Adatta lo scaler al dataset di allenamento
    print("Adattamento della scalatura ai dati di allenamento e trasformazione del dataset...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # Applica lo scaler ai dataset di validazione e test
    print("Trasformazione dei dataset di validazione e test...")
    val_set_transformed = standard_scaler.transform(val_set)
    test_set_transformed = standard_scaler.transform(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

I dataset di allenamento, validazione e test elaborati vengono archiviati in Amazon S3 e verranno passati come parametri di input ai passaggi successivi.

Esegui l’addestramento del modello e l’ottimizzazione degli iperparametri

Con i nostri dati preelaborati e pronti per la modellazione, è ora di addestrare alcuni modelli di ML e ottimizzare i loro iperparametri per massimizzare le prestazioni predittive. Utilizziamo XGBoost-Ray, un backend distribuito per XGBoost costruito su Ray che consente di addestrare modelli XGBoost su grandi dataset utilizzando nodi e GPU multipli. Fornisce semplici sostituzioni plug-and-play per le API di addestramento e previsione di XGBoost, gestendo al contempo le complessità della gestione dei dati distribuita e dell’addestramento sottostante.

Per abilitare la distribuzione dell’addestramento su più nodi, utilizziamo una classe di supporto chiamata RayHelper. Come mostrato nel codice seguente, utilizziamo la configurazione delle risorse del job di addestramento e scegliamo il primo host come nodo principale:

class RayHelper():
    def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
        ....
        self.resource_config = self.get_resource_config()
        self.head_host = self.resource_config["hosts"][0]
        self.n_hosts = len(self.resource_config["hosts"])

Possiamo utilizzare le informazioni sull’host per decidere come inizializzare Ray su ciascuna delle istanze del job di addestramento:

def start_ray(self): 
   head_ip = self._get_ip_from_host()
   # Se l'host corrente è l'host scelto come nodo principale
   # esegui `ray start` specificando il flag --head per rendere questo il nodo principale
    if self.resource_config["current_host"] == self.head_host:
        output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', 
        self.ray_port, '--redis-password', self.redis_pass, 
        '--include-dashboard', 'false'], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        ray.init(address="auto", include_dashboard=False)
        self._wait_for_workers()
        print("Tutti i worker presenti e controllati")
        print(ray.cluster_resources())

    else:
       # Se l'host corrente non è il nodo principale, 
       # esegui `ray start` specificando l'indirizzo IP come head_host come nodo principale
        time.sleep(10)
        output = subprocess.run(['ray', 'start', 
        f"--address={head_ip}:{self.ray_port}", 
        '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        sys.exit(0)

Quando viene avviato un job di addestramento, un cluster Ray può essere inizializzato chiamando il metodo start_ray() su un’istanza di RayHelper:

if __name__ == '__main__':
    ray_helper = RayHelper()
    ray_helper.start_ray()
    args = read_parameters()
    sess = sagemaker.Session(boto3.Session(region_name=args.region))

Successivamente utilizzeremo il trainer XGBoost di XGBoost-Ray per l’addestramento:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result:
    """
    Crea un trainer XGBoost, lo addestra e restituisce il risultato.
    Args:
        ds_train (ray.data.dataset): Dataset di addestramento
        ds_val (ray.data.dataset): Dataset di validazione
        params (dict): Iperparametri
        num_workers (int): numero di worker per distribuire l'addestramento
        target_col (str): colonna target
    Returns:
        result (ray.air.result.Result): Risultato del job di addestramento
    """
    
    train_set = RayDMatrix(ds_train, 'PRICE')
    val_set = RayDMatrix(ds_val, 'PRICE')
    
    evals_result = {}
    
    trainer = train(
        params=params,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(val_set, "validation")],
        verbose_eval=False,
        num_boost_round=100,
        ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
    )
    
    output_path=os.path.join(args.model_dir, 'model.xgb')
    
    trainer.save_model(output_path)
    
    valMAE = evals_result["validation"]["mae"][-1]
    valRMSE = evals_result["validation"]["rmse"][-1]
 
    print('[3] #011validation-mae:{}'.format(valMAE))
    print('[4] #011validation-rmse:{}'.format(valRMSE))
    
    local_testing = False
    try:
        load_run(sagemaker_session=sess)
    except:
        local_testing = True
    if not local_testing: # Traccia l'esperimento se si utilizza SageMaker Training
        with load_run(sagemaker_session=sess) as run:
            run.log_metric('validation-mae', valMAE)
            run.log_metric('validation-rmse', valRMSE)

Si noti che durante l’istanziazione del trainer, passiamo RayParams, che prende il numero di attori e il numero di CPU per attore. XGBoost-Ray utilizza queste informazioni per distribuire l’addestramento su tutti i nodi collegati al cluster Ray.

Ora creiamo un oggetto XGBoost estimator basato su SageMaker Python SDK e lo utilizziamo per il job HPO.

Orchestrazione dei passaggi precedenti utilizzando SageMaker Pipelines

Per creare un flusso di lavoro di ML scalabile e riutilizzabile end-to-end, è necessario utilizzare un tool CI/CD per orchestrare i passaggi precedenti in un pipeline. SageMaker Pipelines ha una integrazione diretta con SageMaker, SageMaker Python SDK e SageMaker Studio. Questa integrazione consente di creare flussi di lavoro di ML con un SDK Python facile da usare, e quindi visualizzare e gestire il flusso di lavoro utilizzando SageMaker Studio. È inoltre possibile tenere traccia della cronologia dei dati all’interno dell’esecuzione della pipeline e designare passaggi per la memorizzazione nella cache.

SageMaker Pipelines crea un grafo aciclico diretto (DAG) che include i passaggi necessari per creare un flusso di lavoro di ML. Ogni pipeline è una serie di passaggi interconnessi orchestrati da dipendenze di dati tra i passaggi e può essere parametrizzato, consentendo di fornire variabili di input come parametri per ogni esecuzione della pipeline. SageMaker Pipelines ha quattro tipi di parametri di pipeline: ParameterString, ParameterInteger, ParameterFloat e ParameterBoolean. In questa sezione, parametrizziamo alcune delle variabili di input e configuriamo la configurazione della memorizzazione nella cache:

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)
feature_group_name = ParameterString(
    name='FeatureGroupName',
    default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString(
    name='Bucket_Prefix',
    default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)
    train_size = ParameterString(
    name='TrainSize',
    default_value="0.6"
)
val_size = ParameterString(
    name='ValidationSize',
    default_value="0.2"
)
test_size = ParameterString(
    name='TestSize',
    default_value="0.2"
)

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

Definiamo due fasi di elaborazione: una per l’ingestione di SageMaker Feature Store, l’altra per la preparazione dei dati. Questo dovrebbe assomigliare molto alle fasi precedenti descritte in precedenza. L’unica nuova riga di codice è il ProcessingStep dopo la definizione delle fasi, che ci consente di prendere la configurazione del job di elaborazione e includerla come passaggio del pipeline. Specifichiamo inoltre la dipendenza della fase di preparazione dei dati dalla fase di ingestione di SageMaker Feature Store. Vedere il seguente codice:

feature_store_ingestion_step = ProcessingStep(
    name='IngestioneFeatureStore',
    step_args=fs_processor_args,
    cache_config=cache_config
)

preprocess_dataset_step = ProcessingStep(
    name='PreparazioneDati',
    step_args=processor_args,
    cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

Allo stesso modo, per creare una fase di addestramento e ottimizzazione del modello, è necessario aggiungere una definizione di TuningStep dopo il codice della fase di addestramento del modello per consentirci di eseguire l’ottimizzazione dei parametri di SageMaker come passaggio nel pipeline:

tuning_step = TuningStep(
    name="OttimizzazioneHP",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

Dopo la fase di ottimizzazione, scegliamo di registrare il miglior modello nel SageMaker Model Registry. Per controllare la qualità del modello, implementiamo un criterio di qualità minima che confronta la metrica obiettivo del miglior modello (RMSE) con una soglia definita come parametro di input del pipeline rmse_threshold. Per effettuare questa valutazione, creiamo un’altra fase di elaborazione per eseguire uno script di valutazione. Il risultato della valutazione del modello verrà archiviato come file di proprietà. I file di proprietà sono particolarmente utili quando si analizzano i risultati di una fase di elaborazione per decidere come eseguire altre fasi. Vedere il seguente codice:

# Specifica dove archivieremo i risultati della valutazione del modello in modo che altre fasi possano accedere a tali risultati
evaluation_report = PropertyFile(
    name='ReportValutazione',
    output_name='valutazione',
    path='valutazione.json',
)

# Una ProcessingStep viene utilizzata per valutare le prestazioni di un modello selezionato dalla fase HPO.
# In questo caso, viene valutato il modello che ha prestazioni migliori.
evaluation_step = ProcessingStep(
    name='ValutaModello',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, s3_bucket=bucket, prefix=s3_prefix
            ),
            destination='/opt/ml/processing/modello',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='valutazione', source='/opt/ml/processing/valutazione'
        ),
    ],
    code='./pipeline_scripts/valuta/script.py',
    property_files=[evaluation_report],
)

Definiamo un ModelStep per registrare il miglior modello nel SageMaker Model Registry nel nostro pipeline. Nel caso in cui il miglior modello non superi il nostro controllo di qualità predefinito, specifichiamo inoltre un FailStep per restituire un messaggio di errore:

register_step = ModelStep(
    name='RegistraModelloAddestrato',
    step_args=model_registry_args
)

metrics_fail_step = FailStep(
    name="FallimentoRMSE",
    error_message=Join(on=" ", values=["L'esecuzione è fallita a causa di RMSE >", rmse_threshold]),
)

Successivamente, utilizziamo un ConditionStep per valutare se il passaggio di registrazione del modello o il passaggio di fallimento deve essere eseguito successivamente nel pipeline. Nel nostro caso, il miglior modello verrà registrato se il suo punteggio RMSE è inferiore alla soglia.

# Passaggio di condizione per valutare la qualità del modello e biforca l'esecuzione
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='metriche_regressione.rmse.valore',
    ),
    right=rmse_threshold,
)
condition_step = ConditionStep(
    name='VerificaValutazione',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[metrics_fail_step],
)

Infine, orchestreremo tutti i passaggi definiti in un pipeline:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [
             feature_store_ingestion_step,
             preprocess_dataset_step,
             tuning_step,
             evaluation_step,
             condition_step
            ]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        feature_group_name,
        train_size,
        val_size,
        test_size,
        bucket_prefix,
        rmse_threshold
    ],
    steps=step_list
)

# Nota: Se una pipeline esistente ha lo stesso nome, verrà sovrascritta.
training_pipeline.upsert(role_arn=role_arn)

La pipeline precedente può essere visualizzata ed eseguita direttamente in SageMaker Studio, oppure può essere eseguita chiamando execution = training_pipeline.start(). La seguente figura illustra il flusso della pipeline.

Inoltre, possiamo esaminare la discendenza degli artefatti generati dall’esecuzione della pipeline.

from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Deploy del modello

Dopo che il miglior modello viene registrato nel registro modelli di SageMaker tramite un’operazione di pipeline, effettuiamo il deploy del modello su un endpoint in tempo reale utilizzando le funzionalità di deploy completamente gestite di SageMaker. SageMaker offre altre opzioni di deploy dei modelli per soddisfare le esigenze di diversi casi d’uso. Per ulteriori dettagli, consulta Deploy models for inference when choosing the right option for your use case. Prima di tutto, otteniamo la registrazione del modello nel registro modelli di SageMaker:

xgb_regressor_model = ModelPackage(
    role_arn,
    model_package_arn=model_package_arn,
    name=model_name
)

Lo stato attuale del modello è PendingApproval. Prima di effettuare il deploy, dobbiamo impostare il suo stato su Approved:

sagemaker_client.update_model_package(
    ModelPackageArn=xgb_regressor_model.model_package_arn,
    ModelApprovalStatus='Approved'
)

xgb_regressor_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name
)

Pulizia

Dopo aver terminato gli esperimenti, ricordati di pulire le risorse per evitare spese inutili. Per la pulizia, elimina l’endpoint in tempo reale, il gruppo di modelli, la pipeline e il gruppo di feature chiamando le API DeleteEndpoint, DeleteModelPackageGroup, DeletePipeline e DeleteFeatureGroup, rispettivamente, e spegni tutte le istanze di notebook di SageMaker Studio.

Conclusioni

In questo post è stata dimostrata una procedura passo-passo su come utilizzare le pipeline di SageMaker per orchestrare flussi di lavoro di ML basati su Ray. Abbiamo anche dimostrato la capacità delle pipeline di SageMaker di integrarsi con strumenti di ML di terze parti. Ci sono vari servizi AWS che supportano carichi di lavoro Ray in modo scalabile e sicuro per garantire eccellenza delle prestazioni ed efficienza operativa. Ora è il tuo turno di esplorare queste potenti capacità e iniziare ad ottimizzare i tuoi flussi di lavoro di machine learning con Amazon SageMaker Pipelines e Ray. Prendi oggi stesso l’iniziativa e sblocca tutto il potenziale dei tuoi progetti di ML!