From 3d2a78110d34d09bb88920d488a72a02ad2ce92f Mon Sep 17 00:00:00 2001 From: Mateus Kasuya Date: Sat, 8 Mar 2025 17:34:16 -0300 Subject: [PATCH 1/2] feat: write_mode --- src/main.py | 27 +++++++++++++++++---------- src/utils/extract_load.py | 14 +++++++++----- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/main.py b/src/main.py index 0b1d0a8..ef524ce 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,5 @@ import os import sys -print("OK") from dotenv import load_dotenv sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) @@ -8,7 +7,7 @@ from src.utils.extract_load import ExtractLoadProcess -def main(): +def main(write_mode: str): load_dotenv() source_user = os.getenv('FIREBIRD_USER') @@ -23,7 +22,7 @@ def main(): destination_port = os.getenv('POSTGRES_PORT') destination_database = os.getenv('POSTGRES_DB') - pipeline = ExtractLoadProcess() + pipeline = ExtractLoadProcess(write_mode= write_mode) # Criando engines source_engine = pipeline.firebird_engine( @@ -63,18 +62,26 @@ def main(): ) print(f'Dados Extraídos com sucesso da source: {table}') - df_cdc = pipeline.change_data_capture(df=source, column='datatlz') + if pipeline.write_mode == "append": - if df_cdc.shape[0] > 0: + df_cdc = pipeline.change_data_capture(df=source, column='datatlz') - pipeline.load_to_destination( + if df_cdc.shape[0] > 0: + + pipeline.load_to_destination( engine=destination_engine, df=df_cdc, table=table ) - print(f'{table} ingerida com sucesso') + print(f'{table} ingerida com sucesso') + + else: + print(f'Não há novos registros, pulando inserção: {table}') - else: - print(f'Não há novos registros, pulando inserção: {table}') + elif pipeline.write_mode == "overwrite": + + pipeline.load_to_destination( + engine=destination_engine, df=source, table=table + ) except Exception as e: print(f'Erro durante o pipeline: {e}') @@ -87,4 +94,4 @@ def main(): if __name__ == '__main__': - main() + main("ovewrite") diff --git a/src/utils/extract_load.py b/src/utils/extract_load.py index 60082f9..7994da8 100644 --- a/src/utils/extract_load.py +++ b/src/utils/extract_load.py @@ -19,6 +19,13 @@ class ExtractLoadProcess(DbEngine): Classe responsável por extrair dados de um banco Firebird e carregá-los em um banco PostgreSQL. """ + def __init__(self, write_mode: str): + """ + write_mode : str, opcional + Modo de escrita no banco de dados. Pode ser "append" para ativar o cdc ou "overwrite" para dar full load. + """ + self.write_mode = write_mode + def extract_from_source(self, engine: Engine, query: str) -> pd.DataFrame: """ Extrai dados do banco Firebird e retorna um DataFrame. @@ -78,8 +85,7 @@ def load_to_destination( self, engine: Engine, df: pd.DataFrame, - table: str, - write_mode: str = 'append', + table: str ): """ Carrega um DataFrame para um banco de dados PostgreSQL. @@ -92,8 +98,6 @@ def load_to_destination( DataFrame a ser carregado. table : str Nome da tabela de destino. - write_mode : str, opcional - Modo de escrita no banco de dados. Pode ser "append" (padrão) ou "replace". Retorno: ------- @@ -102,7 +106,7 @@ def load_to_destination( try: with engine.connect() as conn: df.to_sql( - name=table, con=conn, if_exists=write_mode, index=False + name=table, con=conn, if_exists=self.write_mode, index=False ) except SQLAlchemyError as e: raise ConnectionError( From 84f913cb14f2b498c4847c608a0b199b531b98f3 Mon Sep 17 00:00:00 2001 From: Mateus Kasuya Date: Sat, 8 Mar 2025 17:37:21 -0300 Subject: [PATCH 2/2] fix: testes extract-load --- tests/test_extract_load.py | 42 ++++++++++++-------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/tests/test_extract_load.py b/tests/test_extract_load.py index 954de34..e7232c4 100644 --- a/tests/test_extract_load.py +++ b/tests/test_extract_load.py @@ -1,17 +1,15 @@ from unittest.mock import MagicMock, patch - import pandas as pd import pytest from sqlalchemy.exc import SQLAlchemyError +from datetime import datetime, timedelta -from src.utils.extract_load import ( - ExtractLoadProcess, # Ajuste conforme a estrutura do seu projeto -) +from src.utils.extract_load import ExtractLoadProcess @pytest.fixture def extract_load(): - return ExtractLoadProcess() + return ExtractLoadProcess(write_mode="append") def test_extract_from_source(extract_load): @@ -20,9 +18,7 @@ def test_extract_from_source(extract_load): mock_df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) with patch('pandas.read_sql', return_value=mock_df) as mock_read_sql: - result_df = extract_load.extract_from_source( - mock_engine, 'SELECT * FROM table' - ) + result_df = extract_load.extract_from_source(mock_engine, 'SELECT * FROM table') mock_read_sql.assert_called_once() assert result_df.equals(mock_df) @@ -31,22 +27,17 @@ def test_extract_from_source_failure(extract_load): mock_engine = MagicMock() mock_engine.connect.side_effect = SQLAlchemyError('Erro na conexão') - with pytest.raises( - ConnectionError, - match='Erro ao ler dados do banco de origem: Erro na conexão', - ): + with pytest.raises(ConnectionError, match='Erro ao ler dados do banco de origem: Erro na conexão'): extract_load.extract_from_source(mock_engine, 'SELECT * FROM table') def test_change_data_capture(extract_load): - from datetime import datetime, timedelta - hoje = datetime.today().date() ontem = hoje - timedelta(days=1) df = pd.DataFrame( { 'id': [1, 2, 3, 4], - 'data': [ + 'datatlz': [ pd.Timestamp(ontem), pd.Timestamp(hoje), pd.Timestamp('2023-01-01'), @@ -55,11 +46,11 @@ def test_change_data_capture(extract_load): } ) - result_df = extract_load.change_data_capture(df, 'data') + result_df = extract_load.change_data_capture(df, 'datatlz') assert len(result_df) == 3 - assert all(result_df['data'].dt.date >= ontem) - assert all(result_df['data'].dt.date <= hoje) + assert all(result_df['datatlz'].dt.date >= ontem) + assert all(result_df['datatlz'].dt.date <= hoje) def test_load_to_destination(extract_load): @@ -68,11 +59,9 @@ def test_load_to_destination(extract_load): df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) with patch('pandas.DataFrame.to_sql') as mock_to_sql: - extract_load.load_to_destination( - mock_engine, df, 'dest_table', 'append' - ) + extract_load.load_to_destination(mock_engine, df, 'dest_table') mock_to_sql.assert_called_once_with( - name='dest_table', con=mock_conn, if_exists='append', index=False + name='dest_table', con=mock_conn, if_exists="append", index=False ) @@ -80,14 +69,9 @@ def test_load_to_destination_failure(extract_load): mock_engine = MagicMock() df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) - with patch( - 'pandas.DataFrame.to_sql', - side_effect=SQLAlchemyError('Erro ao inserir dados'), - ): + with patch('pandas.DataFrame.to_sql', side_effect=SQLAlchemyError('Erro ao inserir dados')): with pytest.raises( ConnectionError, match='Erro ao gravar dados no banco destino: Erro ao inserir dados', ): - extract_load.load_to_destination( - mock_engine, df, 'dest_table', 'append' - ) + extract_load.load_to_destination(mock_engine, df, 'dest_table')