Schema FastAPI per LLM SaaS Parte 2 — Celery e Pg-vector

Schema FastAPI per LLM SaaS Parte 2 Celery e Pg-vector

Questo post del blog fa parte della serie FastAPI + Supabase Template per LLM SaaS, basata sui concetti introdotti nella parte 1 (Autenticazione e Caricamento File).

FastAPI Template per LLM SaaS Parte 1 — Autenticazione e Caricamento File

La crescente popolarità di FastAPI tra gli sviluppatori Python è evidenziata per la sua semplicità e il supporto nativo di Swagger UI…

pub.towardsai.net

La maggior parte degli esempi di codice sono tratti da Quivr.

Worker Celery e Coda Messaggi per Processi a Lungo Termine

L’illustrazione di seguito rappresenta come collaborano i worker Celery e le code dei messaggi nell’ecosistema di FastAPI. Il processo inizia con FastAPI che invia compiti a un broker designato (in questo caso, Redis). Successivamente, i worker Celery recuperano e elaborano questi compiti all’interno di una coda di compiti distribuita, salvando i risultati nel Result backend (sempre Redis). Contemporaneamente, FastAPI può monitorare lo stato dei compiti e i risultati. Mentre l’esempio utilizza un’unica istanza di Redis sia per il broker che per il Result backend, è possibile utilizzare istanze separate se necessario.

Source: Sketch dell'autore

Per avviare il processo di sviluppo, è necessario avviare un’istanza di Redis utilizzando i seguenti comandi Docker:

# Preleva l'immagine più recente di Redisdocker pull redis:latest# Esegui un'istanza di Redisdocker run --name redis -d -p 6379:6379 redis:latest

Configura le variabili d’ambiente per il broker e il Result backend nel tuo progetto FastAPI:

# Istanza del broker - RedisCELERY_BROKER_URL=redis://localhost:6379/0# Result backend - RedisCELERY_RESULT_BACKEND=redis://localhost:6379/0

Crea un compito fittizio in main.py per i test:

from celery import Celeryimport time

celery = Celery(    __name__,    broker=os.getenv("CELERY_BROKER_URL"),    backend=os.getenv("CELERY_RESULT_BACKEND"),)@Celery.taskdef test():    import time    time.sleep(5)    return "Ciao, mi piace mangiare sedano!"

Torna al terminale e digita il comando celery (ipotizzando che tu abbia già installato celery nell’ambiente. Se non l’hai fatto, usa semplicemente pip install)

celery --app=main.celery worker --concurrency=1 --loglevel=DEBUG

Nota: se stai testando lo script su una macchina Windows, potrebbe essere necessario aggiungere ‘-P solo’ al comando per farlo funzionare nell’ambiente locale. Ciò non dovrebbe essere necessario per la produzione.

Vedrai qualcosa di simile a quanto segue.

-------------- celery@xxxx v5.2.7 (dawn-chorus)--- ***** ----- -- ******* ---- Windows-10-10.0.22621-SP0 2023-11-20 07:03:38- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         main:0x22d23e16d70- ** ---------- .> transport:   redis://localhost:6379/0- ** ---------- .> results:     redis://localhost:6379/0- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** -----  -------------- [queues]                .> celery           exchange=celery(direct) key=celery

Ora, puoi aprire un altro terminale sulla base della directory di lavoro e utilizzare Python REPL per un test rapido.

(.venv) PS C:\Users\xxx\backend> pythonPython 3.10.11 (tags/v3.10.11:7d4cc5a, Apr  5 2023, 00:38:17) [MSC v.1929 64 bit (AMD64)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from main import app, celery, test>>> test.delay()<AsyncResult: 2ba428c1-5d82-4f37-aa89-5cef76b7a6eb>

Torna al terminale in cui hai in esecuzione il worker di Celery, dovresti osservare l’esecuzione dei task nel terminale del worker di Celery.

[2023-11-20 12:21:46,743: INFO/MainProcess] Task main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] ricevuto[2023-11-20 12:21:51,754: INFO/MainProcess] Task main.test[2ba428c1-5d82-4f37-aa89-5cef76b7a6eb] completato in 5.014999999999418s: 'Ciao, mi piace mangiare il sedano!'

Caricamento di file e datastore di vettori (plugin pg-vector)

Partendo dal test di Celery, il caso d’uso effettivo prevede l’esecuzione di un task di background di Celery per incorporare un documento PDF e salvarlo in un datastore di vettori. Il processo prevede il caricamento del file in un bucket di archiviazione di Supabase e l’attivazione di un task di Celery per scaricarlo ed elaborarlo per il datastore di vettori.

Fonte: diagramma dell'autore

Tutto il processo è un po’ complicato. Inizialmente il file viene caricato nel bucket di archiviazione di Supabase. Quindi attiviamo un task di Celery per scaricare questo file ed elaborarlo per il datastore di vettori. Il processo prevede un Document Loader per convertire il formato originale del file in testo grezzo e uno Text Splitter per dividere il testo in blocchi (a causa del limite di dimensione di un singolo vettore sul datastore di vettori). Inoltre, aggiungeremo metadati per singoli blocchi di testo. Infine, i blocchi di testo verranno incorporati nei vettori e caricati nel datastore di vettori di Supabase (plugin pg-vector di postgres).

Tabelle SQL su Supabase

Innanzitutto, assicurarsi che siano state create due tabelle su Supabase per questa dimostrazione: (per ulteriori esempi di script sql, puoi fare riferimento a https://github.com/StanGirard/quivr/tree/main/scripts)

-- Creazione della tabella utenti X vettoriCREATE TABLE IF NOT EXISTS user_vectors (  user_id UUID,  vector_id UUID,  PRIMARY KEY (user_id, vector_id),  FOREIGN KEY (vector_id) REFERENCES vectors (id),  FOREIGN KEY (user_id) REFERENCES auth.users (id));-- Creazione dell'estensione vectorCREATE EXTENSION IF NOT EXISTS vector;-- Creazione della tabella vettoriCREATE TABLE IF NOT EXISTS vectors (    id UUID DEFAULT uuid_generate_v4() PRIMARY KEY,    content TEXT,    metadata JSONB,    embedding VECTOR(1536));

Definizione Routes ed Endpoints

Nel main.py, aggiungi un nuovo router chiamato ‘upload_router’.

from routes.upload_routes import upload_routerapp.include_router(upload_router)

Crea una nuova directory chiamata ‘routes’, e crea un file chiamato ‘upload_routes.py’

from fastapi.responses import JSONResponsefrom auth import AuthBearer, get_current_userfrom celery_worker import process_filefrom celery.result import AsyncResultfrom fastapi import APIRouter, Depends, HTTPException, Request, UploadFilefrom repository.files.upload_file import upload_file_storagefrom logger import get_loggerfrom models import UserIdentitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Health"])async def healthz():    return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])async def upload_file(    request: Request,    uploadFile: UploadFile,    current_user: UserIdentity = Depends(get_current_user),):    file_content = await uploadFile.read()    filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename)    logger.info(f"il nome del file è: {filename_with_user_id}")    try:        fileInStorage = upload_file_storage(file_content, filename_with_user_id)        logger.info(f"File {fileInStorage} caricato correttamente")    except Exception as e:        if "La risorsa esiste già" in str(e):            raise HTTPException(                status_code=403,                detail=f"Il file {uploadFile.filename} esiste già nell'archivio.",            )        else:            raise HTTPException(                status_code=500, detail="Impossibile caricare il file nell'archivio."            )    task = process_file.delay(        file_name=filename_with_user_id,        file_original_name=uploadFile.filename,        user_id=current_user.id,    )    return JSONResponse({"task_id": task.id})@upload_router.get("/upload/{task_id}", dependencies=[Depends(AuthBearer())], tags=["Upload"])def get_status(task_id: str):    task_result = AsyncResult(task_id)    result = {        "task_id": task_id,        "task_status": task_result.status    }    return JSONResponse(result)

Questo script definirà due endpoint in ‘upload_routes.py’ per l’upload di file e il controllo dello stato del task.

Fonte: Screenshot dell'autore

Vedrai che c’è un task di celery chiamato ‘process_file’ in /upload. Ora creiamo questo task in celery.

Worker e task di Celery

Innanzitutto, crea un file nella directory principale chiamato ‘celery_worker.py’.

import osfrom celery import Celeryimport asynciofrom utils.process_file import get_supabase_client,file_handlercelery = Celery(    __name__,    broker="redis://127.0.0.1:6379/0",    backend="redis://127.0.0.1:6379/0")@celery.task(name="process_file")def process_file(    file_name: str,    file_original_name: str,    user_id: str,):    supabase_client = get_supabase_client()    tmp_file_name = "tmp-file-"+file_name    tmp_file_name = tmp_file_name.replace("/", "_")        with open(tmp_file_name, "wb+") as file:        res = supabase_client.storage.from_("quivr").download(file_name)        file.write(res)        loop = asyncio.new_event_loop()        message = loop.run_until_complete(            file_handler(                file=tmp_file_name,                user_id=user_id,                file_original_name=file_original_name            )        )                file.close    os.remove(tmp_file_name)

Questo task ‘process_file’ (fai riferimento al diagramma di processo sopra) scarica il file, utilizza il file_handler per elaborare il file, quindi rimuove il file temporaneo al termine.

Gestione dei file e incorporazione

Per semplicità, puoi utilizzare lo script file_handler di seguito. Questo script ha un worker per eseguire tutte le incorporazioni. Puoi anche esaminare la base di codice di Quivr dove hanno un altro task condiviso per assegnare le incorporazioni a più worker.

# utils/process_file.py per elaborare il file caricatoimport osimport timefrom logger import get_loggerfrom repository.files.upload_file import DocumentSerializablefrom langchain.document_loaders import UnstructuredPDFLoaderfrom models.databases.supabase.supabase import SupabaseDBfrom supabase.client import Client, create_clientfrom langchain.vectorstores import SupabaseVectorStorefrom langchain.embeddings.openai import OpenAIEmbeddingsfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom dotenv import load_dotenvload_dotenv()logger = get_logger(__name__)def get_supabase_client() -> Client:    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    return supabase_clientdef get_supabase_db() -> SupabaseDB:    supabase_client = get_supabase_client()    return SupabaseDB(supabase_client)def get_embeddings() -> OpenAIEmbeddings:    embeddings = OpenAIEmbeddings(        openai_api_key=os.getenv("OPENAI_API_KEY")    )  # pyright: ignore reportPrivateUsage=none    return embeddingsdef get_documents_vector_store() -> SupabaseVectorStore:    # settings = BrainSettings()  # pyright: ignore reportPrivateUsage=none    embeddings = get_embeddings()    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    documents_vector_store = SupabaseVectorStore(        supabase_client, embeddings, table_name="vectors"    )    return documents_vector_storedef create_vector(doc):    documents_vector_store = get_documents_vector_store()    try:         sids = documents_vector_store.add_documents([doc])        if sids and len(sids) > 0:            return sids            except Exception as e:        logger.error(f"Errore nella creazione del vettore per il documento: {e}")        def create_user_vector(user_id, vector_id):    database = get_supabase_db()    response = (        database.db.table("user_vectors")        .insert(            {                "user_id": str(user_id),                "vector_id": str(vector_id),            }        )        .execute()    )    return response.data    def create_embedding_for_document(user_id, doc_with_metadata):    doc = DocumentSerializable.from_json(doc_with_metadata)    created_vector = create_vector(doc)    created_vector_id = created_vector[0]  # pyright: ignore reportPrivateUsage=none        create_user_vector(user_id, created_vector_id)    def compute_documents_from_pdf(file,loader):    loader = loader(file)    documents=[]    documents = loader.load()    # divide i documenti in frammenti    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(        chunk_size=500, chunk_overlap=0    )    documents = text_splitter.split_documents(documents)    return documents        async def file_handler(    file: str,    file_original_name: str,    user_id,    loader_class=UnstructuredPDFLoader,   #classe del loader da Langchain):    dateshort = time.strftime("%Y%m%d")        documents = compute_documents_from_pdf(file,loader_class)    for doc in documents:  # pyright: ignore reportPrivateUsage=none        metadata = {            "file_name": file_original_name,            "date": dateshort        }        doc_with_metadata = DocumentSerializable(            page_content=doc.page_content, metadata=metadata        )        create_embedding_for_document(            user_id, doc_with_metadata.to_json()        )    return "Ciao, l'elaborazione è terminata!"

A solo scopo dimostrativo, qui sono testati solo file pdf. Per ulteriori formati di file, puoi fare riferimento alla base di codice di Quivr, dove viene utilizzata una classe File per elaborare una vasta gamma di formati di file.

Test end-to-end

Per testare questo, accendi sia il server Uvicorn (per FastAPI) che il server Celery.

uvicorn main:app --reload

celery -A celery_worker worker --loglevel=info --logfile=celery.log --concurrency=1 -P solo

— logfile (opzionale): può salvare un file di log di celery nella tua directory di lavoro

— concurrency (opzionale): imposta quanti worker vuoi avviare contemporaneamente

— P solo: ho bisogno di questo per far funzionare celery su un laptop Windows. Se lo esegui su Mac/Docker, probabilmente non ne hai bisogno.

Ecco uno snippet dal test dei punti finali.

Fonte: screenshot dell'autore
Fonte: screenshot dell'autore