Un progetto di previsione dell’abbandono del cliente potenziato da MLOps

Un potenziamento di MLOps per prevedere l'abbandono del cliente

Introduzione

Quando sentiamo parlare di data science, la prima cosa che ci viene in mente è costruire un modello su notebook e addestrare i dati. Ma questa non è la situazione nella data science del mondo reale. Nel mondo reale, gli scienziati dei dati costruiscono modelli e li mettono in produzione. L’ambiente di produzione ha una lacuna tra lo sviluppo, il rilascio e la affidabilità del modello, e per agevolare le operazioni efficienti e scalabili, gli scienziati dei dati utilizzano MLOps (Machine Learning Operations) per costruire e distribuire applicazioni ML in un ambiente di produzione. In questo articolo, costruiremo e distribuiremo un progetto di previsione del churn dei clienti utilizzando MLOps.

Obiettivi di apprendimento

In questo articolo, imparerai:

  • Panoramica del progetto
  • Introduzione a ZenML e ai fondamenti di MLOPS
  • Scopri come distribuire il modello localmente per la previsione
  • Entra nel preprocessing e nell’ingegneria dei dati, nell’addestramento e nella valutazione del modello

Questo articolo è stato pubblicato come parte del Data Science Blogathon.

Panoramica del progetto

Per prima cosa, dobbiamo capire di cosa si tratta il nostro progetto. Per questo progetto, abbiamo un dataset da un’azienda di telecomunicazioni. Ora, per costruire un modello per prevedere se l’utente è probabile che continui il servizio dell’azienda o meno. Costruiremo questa applicazione ML utilizzando l’aiuto di ZenmML e MLFlow. Questo è il flusso di lavoro del nostro progetto.

Il flusso di lavoro del nostro progetto

  • Raccolta dei dati
  • Preprocessing dei dati
  • Addestramento del modello
  • Valutazione del modello
  • Deployment

Cos’è MLOps?

MLOps è un ciclo di vita end-to-end dell’apprendimento automatico, dallo sviluppo al rilascio e alla manutenzione continua. MLOps è la pratica di ottimizzare e automatizzare l’intero ciclo di vita dei modelli di apprendimento automatico, garantendo al contempo scalabilità, affidabilità ed efficienza.

Spieghiamolo con un esempio semplice:

Immagina di costruire un grattacielo nella tua città. La costruzione dell’edificio è completata. Ma manca l’elettricità, l’acqua, il sistema di drenaggio, ecc. Il grattacielo sarà non funzionale e impraticabile.

Lo stesso si applica ai modelli di apprendimento automatico. Se questi modelli sono progettati senza considerare il rilascio del modello, la scalabilità e la manutenzione a lungo termine, possono diventare inefficaci e impraticabili. Questo rappresenta un ostacolo importante per gli scienziati dei dati nella costruzione di modelli di apprendimento automatico da utilizzare in ambienti di produzione.

MLOps è un insieme di migliori pratiche e strategie che guidano la produzione, il rilascio e la manutenzione a lungo termine dei modelli di apprendimento automatico. Garantisce che questi modelli non solo forniscono previsioni accurate, ma rimangono anche asset robusti, scalabili e preziosi per le aziende. Quindi, senza MLOps, sarebbe un incubo svolgere tutte queste attività in modo efficiente, il che è una sfida. In questo progetto, spiegheremo come funziona MLOps, le diverse fasi e un progetto end-to-end su come costruire un modello di previsione del churn dei clienti.

Presentando ZenML

ZenML è un framework MLOPS open source che aiuta a costruire pipeline portatili e pronte per la produzione. Il framework ZenML ci aiuterà a realizzare questo progetto utilizzando MLOPS.

⚠️ Se sei un utente Windows, prova ad installare wsl su un PC. Zenml non è supportato in Windows.

Prima di passare ai progetti.

Concetti fondamentali di MLOPS

  • Passaggi: I passaggi sono singole unità di compiti in una pipeline o workflow. Ogni passaggio rappresenta un’azione specifica o un’operazione che deve essere eseguita per sviluppare un workflow di machine learning. Ad esempio, la pulizia dei dati, il pre-elaborazione dei dati, l’addestramento dei modelli, ecc., sono determinati passaggi nello sviluppo di un modello di apprendimento automatico.
  • Pipelines: Collegate più passaggi insieme per creare un processo strutturato e automatizzato per compiti di apprendimento automatico. Ad esempio, la pipeline di elaborazione dei dati, la pipeline di valutazione del modello e la pipeline di addestramento del modello.

Per iniziare

Crea un ambiente virtuale per il progetto:

conda create -n churn_prediction python=3.9

Quindi installa queste librerie:

pip install numpy pandas matplotlib scikit-learn

Dopo aver installato questo, installa ZenML:

pip install zenml["server"]

Quindi inizializza il repository ZenML.

zenml init

Ottieni un segnale verde per proseguire se il tuo schermo mostra questo. Dopo l’inizializzazione, verrà creato una cartella .zenml nella tua directory.

Crea una cartella per i dati nella directory. Ottieni i dati a questo link:

Crea le cartelle secondo questa struttura.

Raccolta dati

In questo passaggio, importeremo i dati dal nostro file CSV. Questi dati verranno utilizzati per addestrare il modello dopo la pulizia e la codifica.

Crea un file ingest_data.py dentro la cartella steps.

import pandas as pdimport numpy as npimport loggingfrom zenml import stepclass IngestData:    """    Ingestione dei dati nel workflow.    """    def __init__(self, path:str) -> None:        """        Argomenti:            data_path(str): percorso del file dei dati         """        self.path = path        def get_data(self):        df = pd.read_csv(self.path)        logging.info("Lettura del file csv completata con successo.")        return df    @step(enable_cache = False)def ingest_df(data_path:str) -> pd.DataFrame:    """    Passo ZenML per l'ingestione dei dati da un file CSV.        """    try:        #Creazione di un'istanza della classe IngestData e ingestione dei dati        ingest_data = IngestData(data_path)        df = ingest_data.get_data()        logging.info("Ingestione dei dati completata")        return df    except Exception as e:        #Registra un messaggio di errore se l'ingestione dei dati fallisce e solleva l'eccezione        logging.error("Errore durante l'ingestione dei dati")        raise e

Ecco il link del progetto.

In questo codice, abbiamo prima creato la classe IngestData per incapsulare la logica di ingestione dei dati. Quindi abbiamo creato un passo ZenML, ingest_df, che è un’unità individuale della pipeline di raccolta dati.

Crea un file training_pipeline.py dentro la cartella pipeline.

Scrivi il codice

from zenml import pipelinefrom steps.ingest_data import ingest_df#Definisci una pipeline ZenML chiamata training_pipeline.@pipeline(enable_cache=False)def train_pipeline(data_path:str):    '''    Pipeline dei dati per addestrare il modello.    Argomenti:        data_path (str): Il percorso dei dati da ingestiire.    '''    df = ingest_df(data_path=data_path)

Qui stiamo creando una pipeline di addestramento per addestrare un modello di apprendimento automatico utilizzando una serie di passaggi.

Quindi crea un file chiamato run_pipeline.py nella cartella principale per eseguire la pipeline.

from pipelines.training_pipeline import train_pipelineif __name__ == '__main__':    #Esegui la pipeline    train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")

Questo codice viene utilizzato per eseguire la pipeline.

Ora abbiamo completato la pipeline di acquisizione dati. Eseguiamola.

Esegui il comando nel tuo terminale:

python run_pipeline.py

Successivamente, puoi vedere i comandi, che indicano che la pipeline di addestramento è stata completata con successo.

Preelaborazione dei dati

In questo passaggio, creeremo diverse strategie per la pulizia dei dati. Le colonne indesiderate verranno eliminate e le colonne categoriche verranno codificate utilizzando la codifica dei label. Infine, i dati verranno divisi in dati di addestramento e di test.

Crea un file chiamato clean_data.py nella cartella src.

In questo file, creeremo classi di strategie per la pulizia dei dati.

import pandas as pd
import numpy as np
import logging
from sklearn.model_selection import train_test_split
from abc import abstractmethod, ABC
from typing import Union
from sklearn.preprocessing import LabelEncoder

class DataStrategy(ABC):
    @abstractmethod
    def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame,pd.Series]:
        pass

class DataPreprocessing(DataStrategy):
    def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
        try:
            df['TotalCharges'] = df['TotalCharges'].replace(' ', 0).astype(float)
            df.drop('customerID', axis=1, inplace=True)
            df['Churn'] = df['Churn'].replace({'Yes': 1, 'No': 0}).astype(int)
            service = ['PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity',
                       'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV',
                       'StreamingMovies']
            for col in service:
                df[col] = df[col].replace({'No phone service': 'No', 'No internet service': 'No'})
            logging.info("Lunghezza df: ", len(df.columns))
            return df
        except Exception as e:
            logging.error("Errore nella Preelaborazione", e)
            raise e

class LabelEncoding(DataStrategy):
    def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
        try:
            df_cat = ['gender', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines',
                      'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
                      'TechSupport', 'StreamingTV',  'StreamingMovies', 'Contract',
                      'PaperlessBilling', 'PaymentMethod']
            lencod = LabelEncoder()
            for col in df_cat:
                df[col] = lencod.fit_transform(df[col])
            logging.info(df.head())
            return df
        except Exception as e:
            logging.error(e)
            raise e

class DataDivideStrategy(DataStrategy):
    def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
        try:
            X = df.drop('Churn', axis=1)
            y = df['Churn']
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)
            return X_train, X_test, y_train, y_test
        except Exception as e:
            logging.error("Errore nella suddivisione dei dati", e)
            raise e

Questo codice implementa una pipeline modulare di preelaborazione dei dati per l’apprendimento automatico. Include strategie per la preelaborazione dei dati, la codifica delle caratteristiche e l’elaborazione dei dati delle fasi di pulizia dei dati per la modellazione predittiva.

1. DataPreprocessing: Questa classe è responsabile della rimozione delle colonne indesiderate e della gestione dei valori mancanti (valori NA) nel dataset.

2. LabelEncoding: La classe LabelEncoding è progettata per codificare le variabili categoriche in un formato numerico con cui gli algoritmi di apprendimento automatico possano lavorare in modo efficace. Trasforma le categorie basate su testo in valori numerici.

3. DataDivideStrategy: Questa classe separa il dataset in variabili indipendenti (X) e variabili dipendenti (y). Quindi divide i dati in set di addestramento e di test.

Implementeremo questi passaggi uno per uno per preparare i nostri dati per i compiti di machine learning.

Queste strategie garantiscono che i dati siano strutturati e formattati correttamente per l’addestramento e la valutazione del modello.

Crea data_cleaning.py nella cartella steps.

import pandas come pdimport numpy come npfrom src.clean_data import DataPreprocessing, DataDivideStrategy, LabelEncodingimport loggingfrom typing_extensions import Annotatedfrom typing import Tuplefrom zenml import step# Definire un passo ZenML per la pulizia e la pre-elaborazione dei datidef cleaning_data(df: pd.DataFrame) -> Tuple[    Annotated[pd.DataFrame, "X_train"],    Annotated[pd.DataFrame, "X_test"],    Annotated[pd.Series, "y_train"],    Annotated[pd.Series, "y_test"],]:    try:        # Istanziare la strategia di pre-elaborazione dei dati        data_preprocessing = DataPreprocessing()                # Applicare la pre-elaborazione dei dati al DataFrame di input        data = data_preprocessing.handle_data(df)                  # Istanziare la strategia di codifica delle etichette        feature_encode = LabelEncoding()                # Applicare la codifica delle etichette ai dati pre-elaborati        df_encoded = feature_encode.handle_data(data)                  # Registrare informazioni sulle colonne del DataFrame        logging.info(df_encoded.columns)        logging.info("Colonne:", len(df_encoded))                # Istanziare la strategia di divisione dei dati        split_data = DataDivideStrategy()                # Dividere i dati codificati in set di addestramento e test        X_train, X_test, y_train, y_test = split_data.handle_data(df_encoded)                # Restituire i dati divisi come una tupla        return X_train, X_test, y_train, y_test    except Exception as e:        # Gestire e registrare gli errori che si verificano durante la pulizia dei dati        logging.error("Errore nel passaggio di pulizia dei dati", e)        raise e

In questo passaggio, abbiamo implementato le strategie che abbiamo creato in clean_data.py.

Implementiamo questo passaggio in training_pipeline.py

from zenml import pipeline#importare passaggi from steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_dataimport logging# Definire un'interfaccia ZenML chiamata training_pipeline.@pipeline(enable_cache=False)def train_pipeline(data_path:str):    '''    Pipeline dati per addestrare il modello.    '''    df = ingest_df(data_path=data_path)           X_train, X_test, y_train, y_test = cleaning_data(df=df)

Ecco fatto; abbiamo completato il passaggio di pre-elaborazione dei dati nella pipeline di addestramento.

Addestramento del modello

Ora, stiamo per costruire il modello per questo progetto. Qui, stiamo predendo un problema di classificazione binaria. Possiamo utilizzare la regressione logistica. Il nostro focus non sarà sull’accuratezza del modello. Sarà basato sulla parte di MLOps.

Per coloro che non conoscono la regressione logistica, potete leggere di più su di essa qui. Implementeremo gli stessi passaggi che abbiamo fatto nel passaggio di pre-elaborazione dei dati. Per prima cosa, creeremo un file training_model.py nella cartella src.

import pandas come pdfrom sklearn.linear_model import LogisticRegressionfrom abc import ABC, abstractmethodimport logging# Classe astratta modelclass Model(ABC):    @abstractmethod    def train(self,X_train:pd.DataFrame,y_train:pd.Series):        "" "Addestra il modello sui dati forniti" ""        pass    class LogisticReg(Model):    "" "Implementazione del modello di regressione logistica" ""    def train(self, X_train: pd.DataFrame, y_train: pd.Series):        "" "Formazione del modello Argomenti:" ""        logistic_reg = LogisticRegression()        logistic_reg.fit(X_train,y_train)        return logistic_reg

Definiamo una classe Model astratta con un metodo ‘train’ che tutti i modelli devono implementare. La classe LogisticReg è un’implementazione specifica che utilizza la regressione logistica. Il passaggio successivo consiste nella configurazione di un file denominato config.py nella cartella steps. Crea un file denominato config.py nella cartella steps.

Configurazione dei parametri del modello

from zenml.steps import BaseParameters"""Questo file viene utilizzato per configurare e specificare vari parametri relativi ai modelli di machine learning e al processo di addestramento"""class ModelName(BaseParameters):    "" "Configurazioni del modello" ""    model_name: str = "regressione logistica"

Nel file chiamato config.py, all’interno della cartella steps, si sta configurando i parametri relativi al modello di machine learning. Si crea una classe ModelName che eredita da BaseParameters per specificare il nome del modello. Questo rende facile cambiare il tipo di modello.

import logging
import pandas as pd
from src.training_model import LogisticReg
from zenml import step
from .config import ModelName

# Definire uno step chiamato train_model
@step(enable_cache=False)
def train_model(X_train: pd.DataFrame, y_train: pd.Series, config: ModelName):
    """
    Allenamento dei dati basato sul modello configurato
    """
    try:
        model = None
        if config == "regressione logistica":
            model = LogisticReg()
        else:
            raise ValueError("Il nome del modello non è supportato")

        trained_model = model.train(X_train=X_train, y_train=y_train)
        return trained_model

    except Exception as e:
        logging.error("Errore nell'allenamento del modello", e)
        raise e

Nel file chiamato model_train.py nella cartella degli step, definisci uno step chiamato train_model utilizzando ZenML. Lo scopo di questo step è quello di allenare un modello di machine learning basato sul nome del modello in ModelName.

Nel programma

Verifica il nome del modello configurato. Se è “regressione logistica”, creiamo un’istanza del modello LogisticReg e lo alleniamo con i dati di allenamento forniti (X_train e y_train). Se il nome del modello non è supportato, viene generato un errore. Eventuali errori durante questo processo vengono registrati e viene generato l’errore.

Dopo questo, implementiamo questo step in training_pipeline.py

from zenml import pipeline
from steps.ingest_data import ingest_df
from steps.data_cleaning import cleaning_data
from steps.model_train import train_model
import logging

# Definire una pipeline ZenML chiamata training_pipeline.
@pipeline(enable_cache=False)
def train_pipeline(data_path: str):
    '''
    Pipeline di dati per l'addestramento del modello.
    '''
    # Step per l'ingestione dei dati: restituisce i dati.
    df = ingest_df(data_path=data_path)
    # Step per la pulizia dei dati.
    X_train, X_test, y_train, y_test = cleaning_data(df=df)
    # Addestramento del modello.
    model = train_model(X_train=X_train, y_train=y_train)

Ora, abbiamo implementato lo step train_model nella pipeline. Quindi, lo step model_train.py è completato.

Valutazione del modello

In questo passaggio, valuteremo l’efficienza del nostro modello. Per farlo, controlleremo il punteggio di accuratezza nella previsione dei dati di testing. Pertanto, prima creeremo le strategie che utilizzeremo nella pipeline.

Crea un file chiamato evaluate_model.py nella cartella src.

import logging
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from abc import ABC, abstractmethod
import numpy as np

# Classe astratta per la valutazione del modello
class Evaluate(ABC):
    @abstractmethod
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        """
        Metodo astratto per valutare le prestazioni di un modello di machine learning.
        Args:
            y_true (np.ndarray): Etichette vere.
            y_pred (np.ndarray): Etichette previste.
        Returns:
            float: Risultato della valutazione.
        """
        pass

# Classe per il calcolo del punteggio di accuratezza
class Accuracy_score(Evaluate):
    """
    Calcola e restituisce il punteggio di accuratezza per le previsioni di un modello.
    """
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        try:
            accuracy_scr = accuracy_score(y_true=y_true, y_pred=y_pred) * 100
            logging.info("Punteggio di accuratezza:", accuracy_scr)
            return accuracy_scr
        except Exception as e:
            logging.error("Errore nella valutazione dell'accuratezza del modello", e)
            raise e

# Classe per il calcolo del punteggio di precisione
class Precision_Score(Evaluate):
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        """
        Genera e restituisce un punteggio di precisione per le previsioni di un modello.
        """
        try:
            precision = precision_score(y_true=y_true, y_pred=y_pred)
            logging.info("Punteggio di precisione:", precision)
            return float(precision)
        except Exception as e:
            logging.error("Errore nel calcolo del punteggio di precisione", e)
            raise e

# Classe per il calcolo del punteggio F1
class F1_Score(Evaluate):
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray):
        """
        Genera e restituisce un punteggio F1 per le previsioni di un modello.
        """
        try:
            f1_scr = f1_score(y_pred=y_pred, y_true=y_true)
            logging.info("Punteggio F1:", f1_scr)
            return f1_scr
        except Exception as e:
            logging.error("Errore nel calcolo del punteggio F1", e)
            raise e

Adesso che abbiamo creato le strategie di valutazione, le useremo per valutare il modello. Implementiamo il codice nel file step evaluate_model.py nella cartella steps. Qui, il punteggio di richiamo (recall score), il punteggio di precisione (accuracy score) e il punteggio di precisione (precision score) sono le strategie che stiamo usando come metriche per valutare il modello.

Implementiamo queste strategie passo dopo passo. Crea un file chiamato evaluation.py nella cartella steps:

import logging
import pandas as pd
import numpy as np
from zenml import step
from src.evaluate_model import ClassificationReport, ConfusionMatrix, Accuracy_score
from typing import Tuple
from typing_extensions import Annotated
from sklearn.base import ClassifierMixin

@step(enable_cache=False)
def evaluate_model(
    model: ClassifierMixin,
    X_test: pd.DataFrame,
    y_test: pd.Series
) -> Tuple[
    Annotated[np.ndarray,"matrice_di_confusione"],
    Annotated[str,"classification_report"],
    Annotated[float,"accuracy_score"],
    Annotated[float,"precision_score"],
    Annotated[float,"recall_score"]
]:
    """
    Valuta le prestazioni di un modello di machine learning utilizzando metriche comuni.
    """
    try:
        y_pred = model.predict(X_test)
        precision_score_class = Precision_Score()
        precision_score = precision_score_class.evaluate_model(y_pred=y_pred,y_true=y_test)
        mlflow.log_metric("Precision_score ",precision_score)
        accuracy_score_class = Accuracy_score()
        accuracy_score = accuracy_score_class.evaluate_model(y_true=y_test, y_pred=y_pred)
        logging.info("accuracy_score:",accuracy_score)
        return accuracy_score, precision_score
    except Exception as e:
        logging.error("Errore nella valutazione del modello",e)
        raise e

Ora, implementiamo questo passo nella pipeline. Aggiorna il file training_pipeline.py:

Questo codice definisce un passo evaluate_model in una pipeline di machine learning. Prende un modello di classificazione addestrato (model), dei dati di test indipendenti (X_test) e le etichette vere per i dati di test (y_test) come input. Valuta quindi le prestazioni del modello utilizzando metriche di classificazione comuni e restituisce i risultati, come il precision_score e l’accuracy_score.

Ora, implementiamo questo passo nella pipeline. Aggiorna il file training_pipeline.py:

from zenml import pipeline
from steps.ingest_data import ingest_df
from steps.data_cleaning import cleaning_data
from steps.model_train import train_model
from steps.evaluation import evaluate_model
import logging

# Definisci una pipeline ZenML chiamata training_pipeline.
@pipeline(enable_cache=False)
def train_pipeline(data_path:str):
    '''
    Pipeline dei dati per addestrare il modello.
    Args:
        data_path (str): Il percorso dei dati da importare.
    '''
    # Passo per importare i dati: restituisce i dati.
    df = ingest_df(data_path=data_path)
    # Passo per pulire i dati.
    X_train, X_test, y_train, y_test = cleaning_data(df=df)
    # Addestramento del modello
    model = train_model(X_train=X_train,y_train=y_train)
    # Metriche di valutazione dei dati
    accuracy_score, precision_score = evaluate_model(model=model,X_test=X_test, y_test=y_test)

Ecco fatto. Ora abbiamo completato la pipeline di addestramento. Esegui

python run_pipeline.py

Nel terminale. Se viene eseguito correttamente. Ora che abbiamo completato l’esecuzione di una pipeline di addestramento in locale, apparirà così:

Cos’è un Tracker di Esperimenti?

Un tracker di esperimenti è uno strumento nel machine learning utilizzato per registrare, monitorare e gestire vari esperimenti nel processo di sviluppo del machine learning.

I data scientist sperimentano con modelli diversi per ottenere i migliori risultati. Quindi, è necessario tenere traccia dei dati e utilizzare modelli diversi. Sarebbe molto difficile farlo manualmente utilizzando un foglio di Excel.

MLflow

MLflow è uno strumento prezioso per tenere traccia e gestire in modo efficiente gli esperimenti nel machine learning. Automatizza il tracciamento degli esperimenti, il monitoraggio delle iterazioni del modello e dei dati associati. Ciò semplifica il processo di sviluppo del modello e fornisce un’interfaccia user-friendly per visualizzare i risultati.

L’integrazione di MLflow con ZenML migliora la robustezza e la gestione degli esperimenti all’interno del framework delle operazioni di machine learning.

Per configurare MLflow con ZenML, seguire questi passaggi:

  1. Installare l’integrazione MLflow:
    1. Utilizzare il seguente comando per installare l’integrazione MLflow:
zenml integration install mlflow -y

2. Registrare il tracker di esperimenti MLflow:

Registrare un tracker di esperimenti in MLflow utilizzando questo comando:

zenml experiment-tracker register mlflow_tracker --flavor=mlflow

3. Registrare uno Stack:

In ZenML, uno Stack è una collezione di componenti che definiscono i compiti all’interno del tuo flusso di lavoro di apprendimento automatico. Aiuta ad organizzare e gestire efficacemente le fasi di un pipeline di apprendimento automatico. Registrare uno Stack con:

Puoi trovare ulteriori dettagli nella documentazione.

zenml model-deployer register mlflow --flavor=mlflowzenml stack register mlflow_stack -a default -o default -d mlflow -e mlflow_tracker --set

Questo associa il tuo Stack con impostazioni specifiche per la memorizzazione degli artefatti, gli orchestratori, i target di distribuzione e il tracciamento degli esperimenti.

4. Visualizzare i dettagli dello Stack:

Puoi visualizzare i componenti del tuo Stack utilizzando:

zenml stack describe

Questo mostra i componenti associati allo Stack “mlflow_tracker”.

Ora, implementiamo un tracker di esperimenti nel modello di addestramento ed valutiamo il modello:

Puoi vedere il nome dei componenti come mlflow_tracker.

Configurare il tracker di esperimenti ZenML

Prima di tutto, inizia ad aggiornare il train_model.py:

import loggingimport mlflowimport pandas as pdfrom src.training_model import LogisticRegfrom sklearn.base import ClassifierMixinfrom zenml import stepfrom .config import ModelName#import from zenml.client import Client# Ottieni il tracker di esperimenti dello Stack attivoexperiment_tracker = Client().active_stack.experiment_tracker# Definisci uno step chiamato train_model@step(experiment_tracker = experiment_tracker.name,enable_cache=False)def train_model(    X_train:pd.DataFrame,    y_train:pd.Series,    config:ModelName    ) -> ClassifierMixin:    """    Addestra i dati in base al modello configurato    Args:        X_train: pd.DataFrame = Dati di addestramento indipendenti        y_train: pd.Series = Dati di addestramento dipendenti.            """    try:        model = None        if config.model_name == "logistic regression":            #Registrazione automatica di punteggi, modello ecc...            mlflow.sklearn.autolog()            model = LogisticReg()        else:            raise ValueError("Nome del modello non supportato")                trained_model = model.train(X_train=X_train,y_train=y_train)        logging.info("Addestramento del modello completato.")        return trained_model        except Exception as e:        logging.error("Errore nel passaggio addestramento del modello",e)        raise e

In questo codice, configuriamo il tracker di esperimenti utilizzando mlflow.sklearn.autolog(), che registra automaticamente tutti i dettagli sul modello, semplificando il monitoraggio e l’analisi degli esperimenti.

In evaluation.py

from zenml.client import Clientexperiment_tracker = Client().active_stack.experiment_tracker@step(experiment_tracker=experiment_tracker.name, enable_cache = False)

Esecuzione della pipeline

Aggiorna lo script run_pipeline.py come segue:

from pipelines.training_pipeline import train_pipelinefrom zenml.client import Clientif __name__ == '__main__':    #Stampa l'URI del tracker di esperimenti    print(Client().active_stack.experiment_tracker.get_tracking_uri())    #Esegui la pipeline    train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")

Copialo e incollalo in questo comando.

mlflow ui --backend-store-uri "<--uri in cima a "file:/home/ "

Esplora i Tuoi Esperimenti

Fai clic sul link generato dal comando sopra per aprire l’interfaccia utente di MLflow. Qui troverai un tesoro di informazioni:

  • Pipelines: Accedi facilmente a tutte le pipeline che hai eseguito.

  • Dettagli del Modello: Fai clic su una pipeline per scoprire ogni dettaglio sul tuo modello.
  • Metriche: Approfondisci la sezione delle metriche per visualizzare le prestazioni del tuo modello.

Ora puoi conquistare il tracciamento degli esperimenti di machine learning con ZenML e MLflow!

Deployment

Nella sezione successiva, procederemo con il deployment di questo modello. Devi conoscere questi concetti:

a). Pipeline di Deployment Continuo

Questa pipeline automatizza il processo di deployment del modello. Una volta che un modello supera i criteri di valutazione, viene automaticamente distribuito in un ambiente di produzione. Ad esempio, inizia con la pre-elaborazione dei dati, la pulizia dei dati, l’addestramento dei dati, la valutazione del modello, ecc.

b). Pipeline di Deployment Inferenziale

La Pipeline di Deployment Inferenziale si concentra sul deployment di modelli di machine learning per inferenze in tempo reale o a batch. La Pipeline di Deployment Inferenziale si specializza nel deployment di modelli per effettuare previsioni in un ambiente di produzione. Ad esempio, configura un endpoint API in cui gli utenti possono inviare testo. Garantisce la disponibilità e la scalabilità del modello e ne monitora la performance in tempo reale. Queste pipeline sono importanti per mantenere l’efficienza ed efficacia dei sistemi di machine learning. Ora, procediamo con l’implementazione della pipeline continua.

Crea un file chiamato deployment_pipeline.py nella cartella delle pipeline.

import numpy as npimport jsonimport loggingimport pandas as pdfrom zenml import pipeline, stepfrom zenml.config import DockerSettingsfrom zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUTfrom zenml.integrations.constants import MLFLOWfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (    MLFlowModelDeployer,)from zenml.integrations.mlflow.services import MLFlowDeploymentServicefrom zenml.integrations.mlflow.steps import mlflow_model_deployer_stepfrom zenml.steps import BaseParameters, Outputfrom src.clean_data import FeatureEncodingfrom .utils import get_data_for_testfrom steps.data_cleaning import cleaning_datafrom steps.evaluation import evaluate_modelfrom steps.ingest_data import ingest_df# Definire le impostazioni di Docker con integrazione MLflowdocker_settings = DockerSettings(required_integrations = {MLFLOW})# Definire la classe per la configurazione del trigger di deploymentclass DeploymentTriggerConfig(BaseParameters):    min_accuracy:float = 0.92@step def deployment_trigger(    accuracy: float,    config: DeploymentTriggerConfig,):    """    Triggera il deployment solo se la precisione è maggiore o uguale alla precisione minima.    Args:        accuracy: precisione del modello.        config: valore di soglia di precisione minima.    """    try:        return accuracy >= config.min_accuracy    except Exception as e:        logging.error("Errore nel trigger di deployment",e)        raise e# Definire una pipeline continua@pipeline(enable_cache=False,settings={"docker":docker_settings})def continuous_deployment_pipeline(    data_path:str,    min_accuracy:float = 0.92,    workers: int = 1,    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT):      df = ingest_df(data_path=data_path)    X_train, X_test, y_train, y_test = cleaning_data(df=df)    model = train_model(X_train=X_train, y_train=y_train)    accuracy_score, precision_score = evaluate_model(model=model, X_test=X_test, y_test=y_test)    decisione_deploy = deployment_trigger(accuracy=accuracy_score)    mlflow_model_deployer_step(        model=model,        deploy_decision = decisione_deploy,        workers = workers,        timeout = timeout    )

Framework ZenML per il Progetto di Machine Learning

Questo codice definisce un deployment continuo per un progetto di machine learning utilizzando il Framework ZenML.

1. Importare le librerie necessarie: Importare le librerie necessarie per il deployment del modello.

2. Impostazioni di Docker: Configurando le impostazioni di Docker da utilizzare con MLflow, Docker aiuta a confezionare ed eseguire in modo coerente questi modelli.

3. DeploymentTriggerConfig: È la classe in cui viene configurata la soglia di accuratezza minima per il rilascio di un modello.

4. deployment_trigger: Questo passaggio restituirà se l’accuratezza del modello supera l’accuratezza minima.

5. continuous_deployment_pipeline: Questa pipeline è composta da diversi passaggi: acquisizione dei dati, pulizia dei dati, addestramento del modello e valutazione del modello. E il modello verrà rilasciato solo se raggiunge la soglia di accuratezza minima.

Successivamente, procederemo all’implementazione della pipeline di inferenza in deployment_pipeline.py

import loggingimport pandas as pdfrom zenml.steps import BaseParameters, Outputfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import MLFlowModelDeployerfrom zenml.integrations.mlflow.services import MLFlowDeploymentServiceclass MLFlowDeploymentLoaderStepParameters(BaseParameters):    pipeline_name: str    step_name: str    running: bool = True@step(enable_cache=False)def dynamic_importer() -> str:    data = get_data_for_test()    return data@step(enable_cache=False)def prediction_service_loader(    pipeline_name: str,    pipeline_step_name: str,    running: bool = True,    model_name: str = "model",) -> MLFlowDeploymentService:    model_deployer = MLFlowModelDeployer.get_active_model_deployer()    existing_services = model_deployer.find_model_server(        pipeline_name=pipeline_name,        pipeline_step_name=pipeline_step_name,        model_name=model_name,        running=running,    )    if not existing_services:        raise RuntimeError(            f"Nessun servizio di previsione di MLflow distribuito dal passaggio"            f"{pipeline_step_name} nella pipeline {pipeline_name}"            f"per il modello '{model_name}' è attualmente in esecuzione."        )    return existing_services[0]@stepdef predictor(service: MLFlowDeploymentService, data: str) -> np.ndarray:    service.start(timeout=10)    data = json.loads(data)    prediction = service.predict(data)    return prediction@pipeline(enable_cache=False, settings={"docker": docker_settings})def inference_pipeline(pipeline_name: str, pipeline_step_name: str):    batch_data = dynamic_importer()    model_deployment_service = prediction_service_loader(        pipeline_name=pipeline_name,        pipeline_step_name=pipeline_step_name,        running=False,    )    prediction = predictor(service=model_deployment_service, data=batch_data)    return prediction

Codice che configura una pipeline per effettuare previsioni utilizzando un modello di machine learning distribuito attraverso MLflow. Importa i dati, carica il modello distribuito e lo utilizza per effettuare previsioni.

Dobbiamo creare la funzione get_data_for_test() in utils.py nella cartella delle pipeline. In questo modo possiamo gestire in modo più efficiente il nostro codice.

import loggingimport pandas as pd from src.clean_data import DataPreprocessing, LabelEncoding# Funzione per ottenere i dati a scopo di testdef get_data_for_test():    try:        df = pd.read_csv('./data/WA_Fn-UseC_-Telco-Customer-Churn.csv')        df = df.sample(n=100)        data_preprocessing = DataPreprocessing()        data = data_preprocessing.handle_data(df)                  # Istanza strategia feature encoding        label_encode = LabelEncoding()        df_encoded = label_encode.handle_data(data)         df_encoded.drop(['Churn'],axis=1,inplace=True)        logging.info(df_encoded.columns)        result = df_encoded.to_json(orient="split")        return result    except Exception as e:        logging.error("e")        raise e

Ora, implementiamo la pipeline che abbiamo creato per distribuire il modello e fare previsioni sul modello distribuito.

Crea il file run_deployment.py nella directory del progetto:

import click  # Per gestire gli argomenti da riga di comandoimport logging  from typing import castfrom rich import print  # Per la formattazione dell'output della console# Importa le pipeline per il rilascio e l'inferenza dalle pipeline.deployment_pipeline import (continuous_deployment_pipeline, inference_pipeline)# Importa le utilità e i componenti MLflowfrom zenml.integrations.mlflow.mlflow_utils import get_tracking_urifrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer)from zenml.integrations.mlflow.services import MLFlowDeploymentService# Definire costanti per diverse configurazioni: DEPLOY, PREDICT, DEPLOY_AND_PREDICTDEPLOY = "deploy"PREDICT = "predict"DEPLOY_AND_PREDICT = "deploy_and_predict"# Definisci una funzione principale che utilizza Click per gestire gli argomenti da riga di [email protected]()@click.option(    "--config",    "-c",    type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),    default=DEPLOY_AND_PREDICT,    help="Opzionalmente puoi scegliere di eseguire solo la pipeline di rilascio "    "per addestrare e distribuire un modello (`deploy`), o solo "    "eseguire una previsione sul modello distribuito "    "(`predict`). Di default, entrambi saranno eseguiti "    "(`deploy_and_predict`).",)@click.option(    "--min-accuracy",    default=0.92,    help="Accuratezza minima richiesta per distribuire il modello",)def run_main(config:str, min_accuracy:float ):    # Ottieni il componente attivo di distribuzione del modello MLFlow    mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer()        # Determina se l'utente desidera distribuire un modello (deploy), fare previsioni (predict) o entrambi (deploy_and_predict)    deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT    predict = config == PREDICT or config == DEPLOY_AND_PREDICT        # Se viene richiesto il rilascio di un modello:    if deploy:        continuous_deployment_pipeline(            data_path='/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv',            min_accuracy=min_accuracy,            workers=3,            timeout=60        )        # Se viene richiesta la previsione:    if predict:        # Inizializza l'esecuzione di una pipeline di inferenza        inference_pipeline(            pipeline_name="continuous_deployment_pipeline",            pipeline_step_name="mlflow_model_deployer_step",        )        # Stampa le istruzioni per visualizzare le esecuzioni degli esperimenti nell'interfaccia utente di MLflow    print(        "Puoi eseguire:\n "        f"[italic green]    mlflow ui --backend-store-uri '{get_tracking_uri()}"        "[/italic green]\n ...per ispezionare le esecuzioni degli esperimenti all'interno di MLflow"        " UI.\nPuoi trovare le tue esecuzioni monitorate all'interno "        "dell'esperimento `mlflow_example_pipeline`. Lì sarai in grado di "        "confrontare due o più esecuzioni.\n\n"    )        # Ottieni i servizi esistenti con lo stesso nome di pipeline, nome di passaggio e nome di modello    existing_services = mlflow_model_deployer_component.find_model_server(        pipeline_name = "continuous_deployment_pipeline",        pipeline_step_name = "mlflow_model_deployer_step",    )        # Controlla lo stato del server di previsione:    if existing_services:        service = cast(MLFlowDeploymentService, existing_services[0])        if service.is_running:            print(                f"Il server di previsione di MLflow è in esecuzione in locale come servizio"                f"di processo daemon e accetta richieste di inferenza a: \n"                f"     {service.prediction_url}\n"                f"Per interrompere il servizio, eseguire"                f"[italic green] zenml model-deployer models delete"                f"{str(service.uuid)}'[/italic green]."            )        elif service.is_failed:            print(                f"Il server di previsione di MLflow è in uno stato di errore: \n"                f" Stato precedente: '{service.status.state.value}'\n"                f" Errore precedente: '{service.status.last_error}'"            )    else:        print(            "Nessun server di previsione di MLflow è attualmente in esecuzione. La distribuzione"            "dei modelli deve essere eseguita prima per addestrare un modello e distribuirlo. Esegui"            "lo stesso comando con l'argomento '--deploy' per distribuire un modello."        )        # Punto di ingresso: se lo script viene eseguito direttamente, esegui la funzione principaleif __name__ == "__main__":    run_main()

Questo codice è uno script da riga di comando per gestire e distribuire il modello di machine learning utilizzando MLFlow e ZenMl.

Ora, procediamo con la distribuzione del modello.

Esegui questo comando sul tuo terminale.

python run_deployment.py --config deploy

Ora abbiamo distribuito il nostro modello. La pipeline verrà eseguita con successo e potrai visualizzarla nella dashboard di zenml.

python run_deployment.py --config predict

Inizio del processo di previsione

Ora, il nostro server di previsione di MLFlow è in esecuzione.

Abbiamo bisogno di un’app web per inserire i dati e visualizzare i risultati. Ti starai chiedendo perché dobbiamo creare un’app web da zero.

In realtà no. Useremo Streamlit, che è un framework frontend open-source che aiuta nella creazione rapida e semplice di un’app web frontend per il nostro modello di machine learning.

Installare la libreria

pip install streamlit

Crea un file chiamato streamlit_app.py nella cartella del tuo progetto.

import jsonimport loggingimport numpy as npimport pandas as pdimport streamlit as stfrom PIL import Imagefrom pipelines.deployment_pipeline import prediction_service_loaderfrom run_deployment import maindef main():    st.title("Pipeline di soddisfazione del cliente end-to-end con ZenML")       st.markdown(        """     #### Descrizione del problema      L'obiettivo qui è prevedere il punteggio di soddisfazione del cliente per un ordine dato sulla base delle caratteristiche come lo stato dell'ordine, il prezzo, il pagamento, ecc. Utilizzeremo [ZenML](https://zenml.io/) per creare una pipeline pronta per la produzione per prevedere il punteggio di soddisfazione del cliente per il prossimo ordine o acquisto.    """    )       st.markdown(        """     Sopra c'è una figura dell'intera pipeline, prima acquisiamo i dati, li puliamo, addestriamo il modello, valutiamo il modello e se cambia la fonte dati o cambiano i valori degli iperparametri, verrà attivato il deployment e (ri) addestra il modello e se il modello soddisfa i requisiti minimi di accuratezza, il modello verrà distribuito.    """    )    st.markdown(        """     #### Descrizione delle caratteristiche     Questa app è progettata per prevedere il punteggio di soddisfazione del cliente per un determinato cliente. Puoi inserire le caratteristiche del prodotto elencate di seguito e ottenere il punteggio di soddisfazione del cliente.     | Modelli        | Descrizione   |     | ------------- | -     |     | SeniorCitizen | Indica se il cliente è un anziano. |     | tenure   | Numero di mesi in cui il cliente è stato con l'azienda. |      | MonthlyCharges  | Costi mensili sostenuti dal cliente. |     | TotalCharges | Costi totali sostenuti dal cliente. |    | gender | Genere del cliente (Maschio: 1, Femmina: 0). |     | Partner | Se il cliente ha un partner (Sì: 1, No: 0). |    | Dependents |  Se il cliente ha persone a carico (Sì: 1, No: 0). |    | PhoneService  | Se il cliente ha il servizio telefonico (Sì: 1, No: 0). |       | MultipleLines | Se il cliente ha più linee (Sì: 1, No: 0). |     | InternetService | Tipo di servizio internet (No: 1, Altro: 0). |     | OnlineSecurity | Se il cliente ha il servizio di sicurezza online (Sì: 1, No: 0). |     | OnlineBackup | Se il cliente ha il servizio di backup online (Sì: 1, No: 0). |     | DeviceProtection | Se il cliente ha il servizio di protezione del dispositivo (Sì: 1, No: 0). |     | TechSupport  | Se il cliente ha il servizio di supporto tecnico (Sì: 1, No: 0). |    | StreamingTV  | Se il cliente ha il servizio di streaming TV (Sì: 1, No: 0). |    | StreamingMovies  | Se il cliente ha il servizio di streaming di film (Sì: 1, No: 0). |    | Contract | Tipo di contratto (Un anno: 1, Altro: 0). |    | PaperlessBilling | Se il cliente utilizza la fatturazione elettronica (Sì: 1, No: 0). |    | PaymentMethod  | Metodo di pagamento (Carta di credito: 1, Altro: 0). |    | Churn   | Se il cliente ha chiesto di annullare (Sì: 1, No: 0).   |        """    )        payment_options = {    2: "Assegno elettronico",    3: "Assegno inviato per posta",    1: "Bonifico bancario (automatico)",    0: "Carta di credito (automatico)"    }        contract = {        0: "Mensile",        2: "Biennale",        1: "Annuale"    }        def format_func(PaymentMethod):        return payment_options[PaymentMethod]            def format_func_contract(Contract):        return contract[Contract]        display = ("maschio", "femmina")    options = list(range(len(display)))    # Definisci le colonne dei dati con i relativi valori    SeniorCitizen = st.selectbox("Sei un anziano?",            options=[True, False],)    tenure = st.number_input("Durata")    MonthlyCharges = st.number_input("Costi mensili: ")    TotalCharges = st.number_input("Costi totali: ")    gender = st.radio("Genere:", options, format_func=lambda x: display[x])    Partner = st.radio("Hai un partner? ", options=[True, False])    Dependents = st.radio("A carico di qualcuno? ", options=[True, False])    PhoneService = st.radio("Hai il servizio telefonico? : ", options=[True, False])    MultipleLines = st.radio("Hai linee multiple? ", options=[True, False])    InternetService = st.radio("Hai sottoscritto un servizio Internet? ", options=[True, False])    OnlineSecurity = st.radio("Hai sottoscritto il servizio di sicurezza online? ", options=[True, False])    OnlineBackup = st.radio("Hai sottoscritto il servizio di backup online? ", options=[True, False])    DeviceProtection = st.radio("Hai sottoscritto solo il servizio di protezione del dispositivo?", options=[True, False])    TechSupport =st.radio("Hai sottoscritto il servizio di supporto tecnico? ", options=[True, False])    StreamingTV = st.radio("Hai sottoscritto il servizio di streaming TV", options=[True, False])    StreamingMovies = st.radio("Hai sottoscritto il servizio di streaming di film? ", options=[True, False])    Contract = st.radio("Durata del contratto: ", options=list(contract.keys()), format_func=format_func_contract)    PaperlessBilling = st.radio("Usi la fatturazione elettronica? ", options=[True, False])    PaymentMethod = st.selectbox("Metodo di pagamento:", options=list(payment_options.keys()), format_func=format_func)    # Puoi utilizzare PaymentMethod per ottenere il valore numerico del metodo di pagamento selezionato    if st.button("Prevedi"):        service = prediction_service_loader(        pipeline_name="continuous_deployment_pipeline",        pipeline_step_name="mlflow_model_deployer_step",        running=False,        )        if service is None:            st.write(                "Nessun servizio trovato. La pipeline verrà eseg

Questo codice definisce una StreamLit che fornirà un'interfaccia per la previsione dell'abbandono di clienti in una società di telecomunicazioni basata sui dati del cliente e sui dettagli demografici.

Gli utenti possono inserire le proprie informazioni attraverso un'interfaccia utente intuitiva e il codice utilizza un modello di machine learning addestrato (implementato con ZenML e MLflow) per effettuare previsioni.

Il risultato previsto viene quindi mostrato all'utente.

Ora esegui questo comando:

⚠️ assicurati che il tuo modello di previsione sia in esecuzione

streamlit run streamlit_app.py

Fai clic sul link.

Ecco fatto; abbiamo completato il nostro progetto.

Ecco fatto; abbiamo concluso con successo il nostro progetto di machine learning end-to-end, vedendo come i professionisti affrontano l'intero processo.

Conclusione

In questa completa esplorazione delle operazioni di machine learning (MLOps) attraverso lo sviluppo e la distribuzione di un modello di previsione dell'abbandono dei clienti, abbiamo assistito al potere trasformativo di MLOps nell'ottimizzare il ciclo di vita del machine learning. Dalla raccolta e preelaborazione dei dati all'addestramento, alla valutazione e alla distribuzione del modello, il nostro progetto mette in mostra il ruolo essenziale di MLOps nel colmare il divario tra sviluppo e produzione. Man mano che le organizzazioni fanno sempre più affidamento sulla presa di decisioni basata sui dati, le pratiche efficienti e scalabili dimostrate qui mettono in evidenza l'importanza fondamentale di MLOps nel garantire il successo delle applicazioni di machine learning.

Punti chiave

  • MLOps (Machine Learning Operations) è fondamentale per ottimizzare l'intero ciclo di vita del machine learning, garantendo operazioni efficienti, affidabili e scalabili.
  • ZenML e MLflow sono potenti framework che facilitano lo sviluppo, il monitoraggio e la distribuzione dei modelli di machine learning in applicazioni reali.
  • La corretta preelaborazione dei dati, inclusa la pulizia, la codifica e la divisione, è fondamentale per la creazione di modelli di machine learning robusti.
  • Le metriche di valutazione come l'accuratezza, la precisione, il richiamo e il punteggio F1 offrono una comprensione completa delle prestazioni del modello.
  • Gli strumenti di monitoraggio degli esperimenti come MLflow favoriscono la collaborazione e la gestione degli esperimenti nei progetti di data science.
  • I flussi di lavoro di distribuzione continua e di inferenza sono fondamentali per mantenere l'efficienza e la disponibilità del modello in ambienti di produzione.

Domande frequenti

I media mostrati in questo articolo non sono di proprietà di Analytics Vidhya e sono utilizzati a discrezione dell'autore.