Kafka Event Streaming AI e Automazione

Flusso di eventi Kafka, IA e automazione

Apache Kafka è emerso come un chiaro leader nell’architettura aziendale per il passaggio dai dati in stato di riposo (transazioni del database) allo event streaming. Ci sono molte presentazioni che spiegano come funziona Kafka e come scalare questa tecnologia (sia in locale che nel cloud). Costruire un microservizio che utilizza ChatGPT per consumare messaggi ed arricchirli, trasformarli e persistere i dati è la fase successiva di questo progetto. In questo esempio, consumeremo input da un dispositivo IoT (RaspberryPi) che invia una lettura di temperatura in formato JSON ogni pochi secondi.

Consumare un Messaggio

Ogni volta che un messaggio di evento Kafka viene prodotto (e registrato), un consumatore di microservizi Kafka è pronto per gestire ogni messaggio. Ho chiesto a ChatGPT di generare del codice Python e mi ha dato i fondamenti per sondare e leggere dal “topic” specificato. Quello che ho ottenuto è un ottimo punto di partenza per il consumo di un topic, una chiave e un payload JSON. Il codice creato da ChatGPT utilizza SQLAlchemy per persistere tutto questo in un database. Successivamente, ho voluto trasformare il payload JSON e utilizzare le regole di API Logic Server (ALS: un progetto open source su GitHub) per srotolare il JSON, validarne il contenuto, calcolare e produrre un nuovo set di messaggi basati sulla temperatura esterna eccessivamente alta o bassa rispetto ad un range specificato.

Nota: ChatGPT ha selezionato le librerie Confluent Kafka (e utilizza il loro contenitore Docker Kafka) – è possibile modificare il codice per utilizzare altre librerie Python per Kafka.

Modello di SQLAlchemy

Utilizzando API Logic Server (ALS: una piattaforma Python open-source), ci connettiamo a un database MySQL. ALS leggerà le tabelle e creerà un modello SQLAlchemy ORM, un’interfaccia utente react-admin, un’API OpenAPI(Swagger) safrs-JSON (apri Swagger) e un servizio web REST in esecuzione per ciascun endpoint ORM. La nuova tabella Temperature conterrà l’orario, l’ID del dispositivo IoT e la lettura della temperatura. Qui usiamo l’utilità da linea di comando di ALS per creare il modello ORM:

La classe generata da API Logic Server utilizzata per contenere i nostri valori Temperature.

Cambiamenti

Quindi, anziché salvare nuovamente il messaggio di consumo JSON di Kafka in un database SQL e attivare le regole per farlo, scopriamo il payload JSON (util.row_to_entity) e lo inseriamo nella tabella Temperature invece di salvarlo direttamente. Lasciamo che le regole dichiarative gestiscano ogni lettura di temperatura.

Quando il consumatore riceve il messaggio, lo aggiunge alla sessione, che attiverà la regola commit_event (sotto riportata).

Logica Dichiarativa: Produrre un Messaggio

Utilizzando API Logic Server (un framework di automazione costruito con SQLAlchemy, Flask e un motore di regole LogicBank simile ad un foglio elettronico: con formule, somme, conti, copia, vincoli, eventi, ecc), aggiungiamo una regola dichiarativa commit_event nell’entità ORM Temperature. Ogni volta che un messaggio è persistito nella tabella Temperature, viene richiamata la regola commit_event. Se la lettura della temperatura supera il valore di MAX_TEMP o è inferiore al valore di MIN_TEMP, invieremo un messaggio Kafka nel topic "TempRangeAlert". Aggiungiamo anche un vincolo per assicurarci di ricevere dati entro un range normale (da 32 a 132). Lasciamo che un altro consumatore di eventi gestisca il messaggio di allerta.

Produrre un messaggio di allerta solo se la lettura della temperatura è superiore a MAX_TEMP o inferiore a MIN_TEMP. Il vincolo verificherà il range di temperatura prima di richiamare l’evento di commit (si noti che le regole non sono mai ordinate e possono essere introdotte man mano che le specifiche cambiano).

TDD Behave Testing

Utilizzando TDD (Test Driven Development), è possibile scrivere un test Behave per inserire record direttamente nella tabella Temperature e poi verificare il valore di ritorno KafkaMessageSent. Behave inizia con una funzionalità (file .feature)/scenario. Per ogni scenario, scriviamo una corrispondente classe Python utilizzando i decoratori di Behave.

Definizione della Funzionalità

Classe TDD Python

Sommario

Utilizzando ChatGPT per generare il codice del messaggio Kafka sia per il Consumatore che per il Produttore sembra un buon punto di partenza. Installare Confluent Docker per Kafka. Utilizzando API Logic Server per le regole logiche dichiarative ci permette di aggiungere formule, vincoli ed eventi al normale flusso delle transazioni nel nostro database SQL e produrre (e trasformare) nuovi messaggi Kafka è una grande combinazione. ChatGPT e la logica dichiarativa sono il prossimo livello di “programmazione in coppia”.