Masterizzare l’integrazione dei dati dai sistemi SAP con l’ingegneria tempestiva

Massimizzare l'integrazione dei dati dai sistemi SAP con l'ingegneria tempestiva

Ingegnere edile che indaga il suo lavoro - Diffusione stabile

Introduzione

Nella nostra precedente pubblicazione, Dall’ingegneria dei dati all’ingegneria delle richieste, abbiamo dimostrato come utilizzare ChatGPT per risolvere compiti di preparazione dei dati. Oltre al buon feedback che abbiamo ricevuto, è stato sollevato un punto critico: l’ingegneria delle richieste può aiutare con compiti semplici, ma è davvero utile in un ambiente più impegnativo? Questo è un punto valido. Negli ultimi decenni, le architetture dei dati sono diventate sempre più diverse e complesse. A causa di questa complessità, gli ingegneri dei dati devono sempre di più integrare una varietà di fonti dati con cui non sono necessariamente familiarizzati. L’ingegneria delle richieste può aiutare in questo contesto?

Questo articolo esamina la questione basandosi su un caso d’uso reale della gestione delle risorse umane. Applichiamo l’apprendimento in pochi passi per introdurre un modello di dati SAP HCM a ChatGPT e analizzare le informazioni raccolte con Apache Spark. In questo modo, illustreremo come l’ingegneria delle richieste possa fornire valore anche in ambienti avanzati di ingegneria dei dati.

Informazioni sul caso aziendale

Un compito comune che ogni azienda VoAGI di grandi dimensioni deve svolgere è determinare il numero dei propri dipendenti e il loro assegnamento organizzativo in un determinato momento. I dati associati nel nostro scenario sono memorizzati in un sistema SAP HCM, che è una delle principali applicazioni per la gestione delle risorse umane in ambienti aziendali.

Per risolvere questo tipo di obiettivo, ogni ingegnere dei dati deve acquisire molte conoscenze legate all’attività, che sono strettamente interdipendenti con il modello di dati sottostante.

Questo articolo fornirà una guida passo passo per risolvere il problema aziendale descritto, creando codice PySpark che può essere utilizzato per creare il modello di dati e quindi la base per qualsiasi soluzione di reportistica.

Esempio di report PowerBi che mostra il conteggio del personale

Passo 1: Determinare quali informazioni sono necessarie

Una delle principali sfide della data science è selezionare le informazioni necessarie in base al caso d’uso aziendale e determinarne l’origine nei sistemi di origine. Per risolvere questo problema, dobbiamo fornire a chatgpt alcune conoscenze aziendali. A questo scopo, insegniamo a chatgpt alcune informazioni sulle tabelle di base di SAP HCM che possono essere trovate nel manuale di riferimento di SAP: Risorse umane | Portale di assistenza SAP combinandole con un record di esempio in formato CSV per ogni tabella.

In questo primo scenario, la nostra intenzione è segnalare tutti i dipendenti attivi in un determinato momento. Il risultato dovrebbe includere anche il numero di matricola dei dipendenti, il nome, lo stato e l’assegnazione organizzativa.

Per raccogliere tutte le informazioni necessarie, è necessario inferire uno schema del database a ChatGPT che includa set di dati di esempio e descrizioni dei campi utilizzando il prompting in pochi passi. Inizieremo propagando lo schema del database e alcuni dati di esempio a ChatGPT.

Tutti coloro che conoscono il modello di dati di SAP HCM dovrebbero essere familiari con il concetto di infotipi e tabelle trasparenti. L’infotipo contiene tutte le informazioni transazionali, mentre le tabelle trasparenti contengono le informazioni aziendali (dati di base) di ogni entità.

Per il seguente scenario, utilizzeremo GPT-4 di OpenAI per creare il codice di cui abbiamo bisogno. Iniziamo fornendo le informazioni di base sulla tabella a ChatGPT.

Prompt:Dato il seguente Tables1. Tabella PA0000 - Azioni dipendenteCampo;Chiave;Elemento dati;Dominio;Tipo dati;Lunghezza;Decimale;Descrizione breve;Tabella di controlloMANDT;;MANDT;MANDT;CLNT;3;0;Cliente;T000PERNR;;PERSNO;PERSNO;NUMC;8 ;0;Numero personale;PA0003SUBTY;;SUBTY;SUBTY;CHAR;4;0;Sottotipo;OBJPS;;OBJPS;OBJPS;CHAR;2;0;Identificazione oggetto...contenente i seguenti dataset di esempio:100;99992228;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-01-10T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;3;;0;0100;99992229;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-03-29T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;6;;0;02. Tabella T529U - Valori di statoCampo;Chiave;Elemento dati;Dominio;Tipo dati;Lunghezza;Decimali;Descrizione breve;Tabella di controlloMANDT;;MANDT;MANDT;CLNT;3;0;Cliente;*SPRSL;;SPRAS;SPRAS;LANG;1;0;Chiave lingua;T002STATN;;STATN;STATN;NUMC;1;0;Numero stato;...contenente i seguenti dataset di esempio:MANDT,SPRSL,STATN,STATV,TEXT1100,D,2,3,attivo100,D,2,0,cessato...

Passo 2: Unisci le tabelle di base necessarie e filtra solo i dipendenti attivi

Ora creiamo il codice per unire le tabelle di base:

Prompt: Dammi il codice pyspark per limitare le "Azioni dei dipendenti" a contenere solo dipendenti indicati come attivi in base ai corrispondenti "Valori di stato" per i testi in lingua inglese. Usa "structureVoAGI.PA0000" come fonte per la tabella PA0000 e "structureVoAGI.T529U" come fonte per la tabella T529U. Usa alias di colonna significativi.

Questo produrrà un codice PySpark abbastanza decente e ben formattato:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# Inizializzazione delle sessioni di Sparkspark = SparkSession.builder.appName("EmployeeActionsFilter").getOrCreate()# Lettura delle tabelle in DataFrame df_pa0000 = spark.read.table("structureVoAGI.PA0000")df_t529u = spark.read.table("structureVoAGI.T529U")# Filtraggio di T529U per la descrizione dello stato rilevante e la lingua df_active_status = df_t529u.filter(    (col("TEXT1") == "Active") &    (col("SPRSL") == "E"))# Unione di PA0000 con T529U filtrato in base al campo di stato STAT2 df_active_employees = df_pa0000.join(    df_active_status,    (df_pa0000.STAT2 == df_active_status.STATV) &    (df_active_status.STATN == 2)  # Supponendo che STATN=2 indichi lo stato di impiego dai dati forniti).select(    df_pa0000["PERNR"].alias("NumeroDipendente"),    df_pa0000["MASSN"].alias("TipoAzione"),    df_pa0000["MASSG"].alias("MotivoAzione"),    df_active_status["TEXT1"].alias("DescrizioneStato"))# Mostra il risultato df_active_employees.show() # Opzionalmente, puoi scrivere questo DataFrame in una destinazione o elaborarlo ulteriormente.

Nota che GPT-4 ha riconosciuto i criteri di unione per entrambe le tabelle, che si basano sulla colonna STAT2 della tabella PA0000 e sulla colonna STATV della tabella T529U che è la tabella trasparente corrispondente. Inoltre, il codice creato contiene le descrizioni aziendali come alias di colonna per migliorarne la leggibilità.

Passo 3: Crea una timeline per riflettere la storia dei dipendenti dell’azienda

Ora creeremo un DataFrame che contenga i valori delle date per il periodo che va dal 2020-01-01 al 2024-01-01 e uniremo tutti i dipendenti validi in base alla loro data di ingresso (BEGDA) e possibile data di uscita (ENDDA) per creare una timeline artificiale a cui unire i dipendenti.

PromptIn Pyspark: Crea un dataframe contenente una riga per ogni data che inizia dal 2020-01-01 e termina al 2024-01-01. La colonna Date dovrebbe essere formattata come data. Successivamente, unisci tutti i dipendenti validi in base alla loro data di inizio e di fine. L'output dovrebbe contenere solo dipendenti attivi in base al loro stato di impiego.

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import explode, sequence, to_date, lit, col# Avvia una sessione di Sparkspark = SparkSession.builder.appName("EmployeeTimeline").getOrCreate()# Definisci le date di inizio e finestart_date = to_date(lit("2020-01-01"))end_date = to_date(lit("2024-01-01"))# Crea un DataFrame con una riga per ogni datadate_df = (spark.range(1)          .select(explode(sequence(start_date, end_date)).alias("date")))# Carica le tabelle PA0000 e T529Upa0000_df = spark.read.table("structureVoAGI.PA0000")t529u_df = spark.read.table("structureVoAGI.T529U")# Filtra T529U per dipendenti 'Attivi' con chiave lingua = Eactive_t529u_df = t529u_df.filter((col("SPRSL") == "E") & (col("TEXT1") == "Active"))# Unisci PA0000 con T529U filtrato per ottenere dipendenti attiviactive_pa0000_df = pa0000_df.join(active_t529u_df, pa0000_df.STAT2 == active_t529u_df.STATV)# Unisciti con il DataFrame dell'intervallo di date per ottenere una timeline dei dipendenti attiviactive_employee_timeline_df = date_df.join(active_pa0000_df, (date_df.date >= active_pa0000_df.BEGDA) & (date_df.date <= active_pa0000_df.ENDDA))# Mostra il DataFrameactive_employee_timeline_df.show()

Passo 4: Dissolver uma tabela de relacionamento pai-filho determinando o objeto organizacional de maior nível

Neste último passo, queremos construir um DataFrame que represente a estrutura organizacional da empresa e determinar a atribuição organizacional de cada objeto. Especificamente, queremos determinar qual unidade organizacional de maior nível (por exemplo, área ou divisão) cada objeto filho está atribuído.

A estrutura organizacional pode ser usada para unir a linha do tempo dos funcionários e obter informações detalhadas sobre a atribuição organizacional de cada funcionário em um estágio posterior. Precisamos utilizar a tabela HRP1001 do SAP para conseguir isso:

Prompt: A seguinte tabela contém a atribuição organizacional de cada funcionário: Tabela HRP1001 - Estrutura organizacionalCampo;Elemento de dados;Domínio;Tipo de dados;Tamanho;Casas decimais;Descrição curta;Tabela de verificaçãoMANDT;MANDT;MANDT;CLNT;3;0;Cliente;T000OTYPE;OTYPE;OTYPE;CHAR;2;0;Tipo de objeto;T778OOBJID;HROBJID;HROBJID;NUMC;8;0;ID do objeto;PLOGI...contendo os seguintes conjuntos de dados de exemplo:MANDT,OTYPE,OBJID,PLVAR,RSIGN,RELAT,ISTAT,PRIOX,BEGDA,ENDDA,VARYF,SEQNR,INFTY,OTJID,SUBTY,AEDTM,UNAME,REASN,HISTO,ITXNR,SCLAS,SOBID,PROZT,ADATANR100,P,00998833,A,209,1,,2018-01-01T00:00:00.0000000,9999-12-31T00:00:00.0000000,CP50035300,000,1001,P 00100769,A209,2020-12-11T00:00:00.0000000,TRP1000,,,00000000,CP,60095300,0.00,

Prompt: A tabela HRP1001 contém informações pai-filho sobre a estrutura organizacional da empresa. A coluna OBJID contém o ID do objeto e SOBID contém o ID do objeto pai relacionado. A coluna SCLAS contém a informação se o ID do objeto é uma posição 'S', unidade organizacional 'O' ou papel 'K'. Em Pyspark: Dê-me o código que cria um DataFrame contendo o ID do objeto da unidade organizacional mais alta disponível de cada objeto dissolvendo recursivamente a hierarquia. Use aliases de coluna significativos.

O modelo linguístico produz uma função recursiva que procura a unidade organizacional de maior nível (‘O’) para cada objeto:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# Inicialize uma sessão Sparkspark = SparkSession.builder.appName("UnidadeOrganizacionalMaisAltaDisponível").getOrCreate()HRP1001_df = spark.read.table("structureVoAGI.HRP1001")# Função recursiva para encontrar a unidade organizacional de maior nível disponíveldef encontrar_unidade_org_mais_alta(df, id_objeto):    # Filtrar linhas para o ID do objeto atual    df_objeto = df.filter(col("OBJID") == id_objeto)        if df_objeto.count() == 0:        return None        # Obter o ID do objeto pai (SOBID) e SCLAS para o objeto atual    id_objeto_pai = df_objeto.select("SOBID").first()[0]    sclas = df_objeto.select("SCLAS").first()[0]        # Encontrar recursivamente a unidade organizacional de maior nível disponível    unidade_org_mais_alta = encontrar_unidade_org_mais_alta(df, id_objeto_pai)        # Se o objeto atual for uma unidade organizacional ('O') ou unidade_org_mais_alta for None, atualizar o resultado    if sclas == 'O' or unidade_org_mais_alta is None:        unidade_org_mais_alta = id_objeto        return unidade_org_mais_alta# Criar uma UDF (Função Definida pelo Usuário) para aplicar a função recursivafrom pyspark.sql.functions import udffrom pyspark.sql.types import StringTypeencontrar_unidade_org_mais_alta_udf = udf(encontrar_unidade_org_mais_alta, StringType())# Aplicar a UDF para encontrar a unidade organizacional de maior nível disponível para cada objetounidade_org_mais_alta_df = df.withColumn("UnidadeOrganizacionalMaisAltaDisponivel", encontrar_unidade_org_mais_alta_udf(df, col("OBJID")))# Selecionar colunas significativas e aliasesresult_df = unidade_org_mais_alta_df.select(    col("OBJID").alias("IDObjeto"),    col("UnidadeOrganizacionalMaisAltaDisponivel").alias("UnidadeOrganizacionalMaisAltaDisponivel"))# Mostrar o DataFrame resultanteresult_df.show()# Parar a sessão Sparkspark.stop()

Mesmo quando o código criado não está muito bem otimizado, ele contém uma função recursiva para dissolver a hierarquia. Usuários que preferem expressões comuns de tabela (CTEs) devem dar a sugestão (usando uma expressão comum de tabela) na solicitação de entrada para criar uma instrução PySpark mais legível e compreensível.

da pyspark.sql import SparkSession# Crea una sessione Sparkspark = SparkSession.builder.appName("OrganizationalUnitHierarchy").getOrCreate()# Definisci il DataFrame HRP1001 (assumendo che sia già caricato)# Sostituisci 'hrp1001_df' con il tuo effettivo DataFrame# Assicurati di avere le colonne appropriate: 'OBJID', 'SOBID', e 'SCLAS'# Registra il DataFrame HRP1001 come una tabella temporanea SQLhrp1001_df.createOrReplaceTempView("hrp1001")# Definisci la query ricorsiva per dissolvere la gerarchia e trovare l'unità organizzativa più altarecursive_query = """WITH RECURSIVE OrgHierarchy AS (  SELECT OBJID, SOBID, SCLAS  FROM hrp1001  WHERE SCLAS = 'O'  -- Inizia con le unità organizzative    UNION ALL    SELECT h.OBJID, p.SOBID, p.SCLAS  FROM OrgHierarchy h  JOIN hrp1001 p ON h.SOBID = p.OBJID)SELECT DISTINCT  h.OBJID AS EmployeeID,  h.SOBID AS HighestOrganizationalUnitFROM OrgHierarchy h"""# Esegui la query ricorsiva e crea il DataFrame risultanteresult_df = spark.sql(recursive_query)# Mostra il risultatresult_df.show()# Ferma la sessione Spark quando haispark.stop()

Entrambe le versioni del codice creeranno il dataframe per la struttura organizzativa gerarchica appiattita, che può essere utilizzato per ulteriori passaggi di integrazione dei dati semplicemente unendolo al DataFrame generato in precedenza.

Dataset contenente informazioni gerarchiche

Conclusioni

Abbiamo mostrato come affrontare compiti di data engineering più avanzati in un caso d’uso pratico estrarre ed integrare dati dai sistemi SAP utilizzando ChatGPT per generare il codice PySpark. I modelli di linguaggio di grandi dimensioni potrebbero non essere ancora perfetti, ma ognuno può già immaginare quanto potenti possano diventare queste tecniche per i data engineer. Ci sono diverse considerazioni chiave:

  • ChatGPT è in grado di comprendere i principi fondamentali dei modelli di dati. Puoi affinare la sua comprensione utilizzando tecniche di prompting per fornire conoscenze più approfondite.
  • Anche se l’approccio non produce codice perfetto al primo tentativo, possiamo facilmente adattare il codice creato per adattarlo ai nostri scenari specifici.
  • Grazie alla vasta disponibilità di documenti di riferimento aperti e basi di conoscenza SAP, l’approccio può essere ampliato a una soluzione di Retrieval-Augmented Generation (RAG).

Per quanto riguarda le best practices di prompt engineering, cerca di essere il più preciso possibile e fornisci gli errorcodes restituiti dal tuo ambiente Spark per sfruttare le capacità di LLM nel rifattorizzare il codice generato. Potrebbero essere necessari più tentativi per affinare il codice, tuttavia l’aggiunta di parole chiave come “precise” al tuo prompt potrebbe aiutare ChatGPT a produrre risultati migliori. Chiedi una spiegazione dettagliata per l’approccio della soluzione, in quanto ciò costringerà il modello del trasformatore di ChatGPT a scavare più in profondità.

Nota: I prompt contenenti gli esempi di dataset csv sono stati troncati a causa dei vincoli di lunghezza di questo articolo.

Gli autori

Markus Stadi è un Senior Cloud Data Engineer presso Dehn SE che lavora nel campo del Data Engineering, Data Science e Data Analytics da molti anni.

Christian Koch è un Enterprise Architect presso BWI GmbH e lettore presso il Nuremberg Institute of Technology Georg Simon Ohm.