8000 Write mode by MateusKasuya · Pull Request #10 · MateusKasuya/firebird_to_postgres · GitHub
[go: up one dir, main page]

Skip to content

Write mode #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import os
import sys
print("OK")
from dotenv import load_dotenv

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from src.utils.extract_load import ExtractLoadProcess


def main():
def main(write_mode: str):
load_dotenv()

source_user = os.getenv('FIREBIRD_USER')
Expand All @@ -23,7 +22,7 @@ def main():
destination_port = os.getenv('POSTGRES_PORT')
destination_database = os.getenv('POSTGRES_DB')

pipeline = ExtractLoadProcess()
pipeline = ExtractLoadProcess(write_mode= write_mode)

# Criando engines
source_engine = pipeline.firebird_engine(
Expand Down Expand Up @@ -63,18 +62,26 @@ def main():
)
print(f'Dados Extraídos com sucesso da source: {table}')

df_cdc = pipeline.change_data_capture(df=source, column='datatlz')
if pipeline.write_mode == "append":

if df_cdc.shape[0] > 0:
df_cdc = pipeline.change_data_capture(df=source, column='datatlz')

pipeline.load_to_destination(
if df_cdc.shape[0] > 0:

pipeline.load_to_destination(
engine=destination_engine, df=df_cdc, table=table
)

print(f'{table} ingerida com sucesso')
print(f'{table} ingerida com sucesso')

else:
print(f'Não há novos registros, pulando inserção: {table}')

else:
print(f'Não há novos registros, pulando inserção: {table}')
elif pipeline.write_mode == "overwrite":

pipeline.load_to_destination(
engine=destination_engine, df=source, table=table
)

except Exception as e:
print(f'Erro durante o pipeline: {e}')
Expand All @@ -87,4 +94,4 @@ def main():


if __name__ == '__main__':
main()
main("ovewrite")
14 changes: 9 additions & 5 deletions src/utils/extract_load.py
10000
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ class ExtractLoadProcess(DbEngine):
Classe responsável por extrair dados de um banco Firebird e carregá-los em um banco PostgreSQL.
"""

def __init__(self, write_mode: str):
"""
write_mode : str, opcional
Modo de escrita no banco de dados. Pode ser "append" para ativar o cdc ou "overwrite" para dar full load.
"""
self.write_mode = write_mode

def extract_from_source(self, engine: Engine, query: str) -> pd.DataFrame:
"""
Extrai dados do banco Firebird e retorna um DataFrame.
Expand Down Expand Up @@ -78,8 +85,7 @@ def load_to_destination(
self,
engine: Engine,
df: pd.DataFrame,
table: str,
write_mode: str = 'append',
table: str
):
"""
Carrega um DataFrame para um banco de dados PostgreSQL.
Expand All @@ -92,8 +98,6 @@ def load_to_destination(
DataFrame a ser carregado.
table : str
Nome da tabela de destino.
write_mode : str, opcional
Modo de escrita no banco de dados. Pode ser "append" (padrão) ou "replace".

Retorno:
-------
Expand All @@ -102,7 +106,7 @@ def load_to_destination(
try:
with engine.connect() as conn:
df.to_sql(
name=table, con=conn, if_exists=write_mode, index=False
name=table, con=conn, if_exists=self.write_mode, index=False
)
except SQLAlchemyError as e:
raise ConnectionError(
Expand Down
42 changes: 13 additions & 29 deletions tests/test_extract_load.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from unittest.mock import MagicMock, patch

import pandas as pd
import pytest
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta

from src.utils.extract_load import (
ExtractLoadProcess, # Ajuste conforme a estrutura do seu projeto
)
from src.utils.extract_load import ExtractLoadProcess


@pytest.fixture
def extract_load():
return ExtractLoadProcess()
return ExtractLoadProcess(write_mode="append")


def test_extract_from_source(extract_load):
Expand All @@ -20,9 +18,7 @@ def test_extract_from_source(extract_load):
mock_df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})

with patch('pandas.read_sql', return_value=mock_df) as mock_read_sql:
result_df = extract_load.extract_from_source(
mock_engine, 'SELECT * FROM table'
)
result_df = extract_load.extract_from_source(mock_engine, 'SELECT * FROM table')
mock_read_sql.assert_called_once()
assert result_df.equals(mock_df)

Expand All @@ -31,22 +27,17 @@ def test_extract_from_source_failure(extract_load):
mock_engine = MagicMock()
mock_engine.connect.side_effect = SQLAlchemyError('Erro na conexão')

with pytest.raises(
ConnectionError,
match='Erro ao ler dados do banco de origem: Erro na conexão',
):
with pytest.raises(ConnectionError, match='Erro ao ler dados do banco de origem: Erro na conexão'):
extract_load.extract_from_source(mock_engine, 'SELECT * FROM table')


def test_change_data_capture(extract_load):
from datetime import datetime, timedelta

hoje = datetime.today().date()
ontem = hoje - timedelta(days=1)
df = pd.DataFrame(
{
'id': [1, 2, 3, 4],
'data': [
'datatlz': [
pd.Timestamp(ontem),
pd.Timestamp(hoje),
pd.Timestamp('2023-01-01'),
Expand All @@ -55,11 +46,11 @@ def test_change_data_capture(extract_load):
}
)

result_df = extract_load.change_data_capture(df, 'data')
result_df = extract_load.change_data_capture(df, 'datatlz')

assert len 9C66 (result_df) == 3
assert all(result_df['data'].dt.date >= ontem)
assert all(result_df['data'].dt.date <= hoje)
assert all(result_df['datatlz'].dt.date >= ontem)
assert all(result_df['datatlz'].dt.date <= hoje)


def test_load_to_destination(extract_load):
Expand All @@ -68,26 +59,19 @@ def test_load_to_destination(extract_load):
df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})

with patch('pandas.DataFrame.to_sql') as mock_to_sql:
extract_load.load_to_destination(
mock_engine, df, 'dest_table', 'append'
)
extract_load.load_to_destination(mock_engine, df, 'dest_table')
mock_to_sql.assert_called_once_with(
name='dest_table', con=mock_conn, if_exists='append', index=False
name='dest_table', con=mock_conn, if_exists="append", index=False
)


def test_load_to_destination_failure(extract_load):
mock_engine = MagicMock()
df = pd.DataFrame({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})

with patch(
'pandas.DataFrame.to_sql',
side_effect=SQLAlchemyError('Erro ao inserir dados'),
):
with patch('pandas.DataFrame.to_sql', side_effect=SQLAlchemyError('Erro ao inserir dados')):
with pytest.raises(
ConnectionError,
match='Erro ao gravar dados no banco destino: Erro ao inserir dados',
):
extract_load.load_to_destination(
mock_engine, df, 'dest_table', 'append'
)
extract_load.load_to_destination(mock_engine, df, 'dest_table')
0