8000 fix by MateusKasuya · Pull Request #11 · MateusKasuya/firebird_to_postgres · GitHub
[go: up one dir, main page]

Skip to content

fix #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2025
Merged

fix #11

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys

from dotenv import load_dotenv

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
Expand All @@ -22,7 +23,7 @@ def main(write_mode: str):
destination_port = os.getenv('POSTGRES_PORT')
destination_database = os.getenv('POSTGRES_DB')

pipeline = ExtractLoadProcess(write_mode= write_mode)
pipeline = ExtractLoadProcess(write_mode=write_mode)

# Criando engines
source_engine = pipeline.firebird_engine(
Expand Down Expand Up @@ -62,9 +63,11 @@ def main(write_mode: str):
)
print(f'Dados Extraídos com sucesso da source: {table}')

if pipeline.write_mode == "append":
if pipeline.write_mode == 'append':

df_cdc = pipeline.change_data_capture(df=source, column='datatlz')
df_cdc = pipeline.change_data_capture(
df=source, column='datatlz'
)

if df_cdc.shape[0] > 0:

Expand All @@ -77,7 +80,7 @@ def main(write_mode: str):
else:
print(f'Não há novos registros, pulando inserção: {table}')

elif pipeline.write_mode == "replace":
elif pipeline.write_mode == 'replace':

pipeline.load_to_destination(
engine=destination_engine, df=source, table=table
Expand All @@ -94,4 +97,4 @@ def main(write_mode: str):


if __name__ == '__main__':
main("replace")
main('replace')
12 changes: 6 additions & 6 deletions src/utils/extract_load.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ def change_data_capture(
return df_cdc

def load_to_destination(
self,
engine: Engine,
df: pd.DataFrame,
table: str
self, engine: Engine, df: pd.DataFrame, table: str
):
"""
Carrega um DataFrame para um banco de dados PostgreSQL.
Expand All @@ -106,9 +103,12 @@ def load_to_destination(
try:
with engine.connect() as conn:
df.to_sql(
name=table, con=conn, if_exists=self.write_mode, index=False
name=table,
con=conn,
if_exists=self.write_mode,
index=False,
)
except SQLAlchemyError as e:
raise ConnectionError(
f'Erro ao gravar dados no banco destino: {e}'
)
)
23 changes: 16 additions & 7 deletions tests/test_extract_load.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch

import pandas as pd
import pytest
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta

from src.utils.extract_load import ExtractLoadProcess


@pytest.fixture
def extract_load():
return ExtractLoadProcess(write_mode="append")
return ExtractLoadProcess(write_mode='append')


def test_extract_from_source(extract_load):
Expand All @@ -18,7 +19,9 @@ def test_extract_from_source(extract_load):
mock_df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})

with patch('pandas.read_sql', return_value=mock_df) as mock_read_sql:
result_df = extract_load.extract_from_source(mock_engine, 'SELECT * FROM table')
result_df = extract_load.extract_from_source(
mock_engine, 'SELECT * FROM table'
)
mock_read_sql.assert_called_once()
assert result_df.equals(mock_df)

Expand All @@ -27,7 +30,10 @@ def test_extract_from_source_failure(extract_load):
mock_engine = MagicMock()
mock_engine.connect.side_effect = SQLAlchemyError('Erro na conexão')

with pytest.raises(ConnectionError, match='Erro ao ler dados do banco de origem: Erro na conexão'):
with pytest.raises(
ConnectionError,
match='Erro ao ler dados do banco de origem: Erro na conexão',
):
extract_load.extract_from_source(mock_engine, 'SELECT * FROM table')


Expand Down Expand Up @@ -61,17 +67,20 @@ def test_load_to_destination(extract_load):
with patch('pandas.DataFrame.to_sql') as mock_to_sql:
extract_load.load_to_destination(mock_engine, df, 'dest_table')
mock_to_sql.assert_called_once_with(
name='dest_table', con=mock_conn, if_exists="append", index=False
name='dest_table', con=mock_conn, if_exists='append', index=False
)

5FD2
def test_load_to_destination_failure(extract_load):
mock_engine = MagicMock()
df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})

with patch('pandas.DataFrame.to_sql', side_effect=SQLAlchemyError('Erro ao inserir dados')):
with patch(
'pandas.DataFrame.to_sql',
side_effect=SQLAlchemyError('Erro ao inserir dados'),
):
with pytest.raises(
ConnectionError,
match='Erro ao gravar dados no banco destino: Erro ao inserir dados',
):
extract_load.load_to_destination(mock_engine, df, 'dest_table')
extract_load.load_to_destination(mock_engine, df, 'dest_table')
0