diff --git a/src/main.py b/src/main.py index fe96f5c..a7f3317 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,6 @@ import os import sys + from dotenv import load_dotenv sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) @@ -22,7 +23,7 @@ def main(write_mode: str): destination_port = os.getenv('POSTGRES_PORT') destination_database = os.getenv('POSTGRES_DB') - pipeline = ExtractLoadProcess(write_mode= write_mode) + pipeline = ExtractLoadProcess(write_mode=write_mode) # Criando engines source_engine = pipeline.firebird_engine( @@ -62,9 +63,11 @@ def main(write_mode: str): ) print(f'Dados Extraídos com sucesso da source: {table}') - if pipeline.write_mode == "append": + if pipeline.write_mode == 'append': - df_cdc = pipeline.change_data_capture(df=source, column='datatlz') + df_cdc = pipeline.change_data_capture( + df=source, column='datatlz' + ) if df_cdc.shape[0] > 0: @@ -77,7 +80,7 @@ def main(write_mode: str): else: print(f'Não há novos registros, pulando inserção: {table}') - elif pipeline.write_mode == "replace": + elif pipeline.write_mode == 'replace': pipeline.load_to_destination( engine=destination_engine, df=source, table=table @@ -94,4 +97,4 @@ def main(write_mode: str): if __name__ == '__main__': - main("replace") \ No newline at end of file + main('replace') diff --git a/src/utils/extract_load.py b/src/utils/extract_load.py index 5335700..130f44f 100644 --- a/src/utils/extract_load.py +++ b/src/utils/extract_load.py @@ -82,10 +82,7 @@ def change_data_capture( return df_cdc def load_to_destination( - self, - engine: Engine, - df: pd.DataFrame, - table: str + self, engine: Engine, df: pd.DataFrame, table: str ): """ Carrega um DataFrame para um banco de dados PostgreSQL. @@ -106,9 +103,12 @@ def load_to_destination( try: with engine.connect() as conn: df.to_sql( - name=table, con=conn, if_exists=self.write_mode, index=False + name=table, + con=conn, + if_exists=self.write_mode, + index=False, ) except SQLAlchemyError as e: raise ConnectionError( f'Erro ao gravar dados no banco destino: {e}' - ) \ No newline at end of file + ) diff --git a/tests/test_extract_load.py b/tests/test_extract_load.py index 66149c4..2581ade 100644 --- a/tests/test_extract_load.py +++ b/tests/test_extract_load.py @@ -1,15 +1,16 @@ +from datetime import datetime, timedelta from unittest.mock import MagicMock, patch + import pandas as pd import pytest from sqlalchemy.exc import SQLAlchemyError -from datetime import datetime, timedelta from src.utils.extract_load import ExtractLoadProcess @pytest.fixture def extract_load(): - return ExtractLoadProcess(write_mode="append") + return ExtractLoadProcess(write_mode='append') def test_extract_from_source(extract_load): @@ -18,7 +19,9 @@ def test_extract_from_source(extract_load): mock_df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) with patch('pandas.read_sql', return_value=mock_df) as mock_read_sql: - result_df = extract_load.extract_from_source(mock_engine, 'SELECT * FROM table') + result_df = extract_load.extract_from_source( + mock_engine, 'SELECT * FROM table' + ) mock_read_sql.assert_called_once() assert result_df.equals(mock_df) @@ -27,7 +30,10 @@ def test_extract_from_source_failure(extract_load): mock_engine = MagicMock() mock_engine.connect.side_effect = SQLAlchemyError('Erro na conexão') - with pytest.raises(ConnectionError, match='Erro ao ler dados do banco de origem: Erro na conexão'): + with pytest.raises( + ConnectionError, + match='Erro ao ler dados do banco de origem: Erro na conexão', + ): extract_load.extract_from_source(mock_engine, 'SELECT * FROM table') @@ -61,7 +67,7 @@ def test_load_to_destination(extract_load): with patch('pandas.DataFrame.to_sql') as mock_to_sql: extract_load.load_to_destination(mock_engine, df, 'dest_table') mock_to_sql.assert_called_once_with( - name='dest_table', con=mock_conn, if_exists="append", index=False + name='dest_table', con=mock_conn, if_exists='append', index=False ) @@ -69,9 +75,12 @@ def test_load_to_destination_failure(extract_load): mock_engine = MagicMock() df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) - with patch('pandas.DataFrame.to_sql', side_effect=SQLAlchemyError('Erro ao inserir dados')): + with patch( + 'pandas.DataFrame.to_sql', + side_effect=SQLAlchemyError('Erro ao inserir dados'), + ): with pytest.raises( ConnectionError, match='Erro ao gravar dados no banco destino: Erro ao inserir dados', ): - extract_load.load_to_destination(mock_engine, df, 'dest_table') \ No newline at end of file + extract_load.load_to_destination(mock_engine, df, 'dest_table')