|
| 1 | +import os |
| 2 | +import typing |
| 3 | +from io import BytesIO |
| 4 | + |
| 5 | +import pytest |
| 6 | +from pgpq import ArrowToPostgresBinaryEncoder |
| 7 | +from pyarrow import parquet |
| 8 | + |
| 9 | +from psqlpy import ConnectionPool |
| 10 | + |
| 11 | +pytestmark = pytest.mark.anyio |
| 12 | + |
| 13 | + |
| 14 | +async def test_binary_copy_to_table_in_connection( |
| 15 | + psql_pool: ConnectionPool, |
| 16 | +) -> None: |
| 17 | + """Test binary copy in connection.""" |
| 18 | + table_name: typing.Final = "cars" |
| 19 | + await psql_pool.execute(f"DROP TABLE IF EXISTS {table_name}") |
| 20 | + await psql_pool.execute( |
| 21 | + """ |
| 22 | +CREATE TABLE IF NOT EXISTS cars ( |
| 23 | + model VARCHAR, |
| 24 | + mpg FLOAT8, |
| 25 | + cyl INTEGER, |
| 26 | + disp FLOAT8, |
| 27 | + hp INTEGER, |
| 28 | + drat FLOAT8, |
| 29 | + wt FLOAT8, |
| 30 | + qsec FLOAT8, |
| 31 | + vs INTEGER, |
| 32 | + am INTEGER, |
| 33 | + gear INTEGER, |
| 34 | + carb INTEGER |
| 35 | +); |
| 36 | +""", |
| 37 | + ) |
| 38 | + |
| 39 | + arrow_table = parquet.read_table( |
| 40 | + f"{os.path.dirname(os.path.abspath(__file__))}/test_data/MTcars.parquet", # noqa: PTH120, PTH100 |
| 41 | + ) |
| 42 | + encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema) |
| 43 | + buf = BytesIO() |
| 44 | + buf.write(encoder.write_header()) |
| 45 | + for batch in arrow_table.to_batches(): |
| 46 | + buf.write(encoder.write_batch(batch)) |
| 47 | + buf.write(encoder.finish()) |
| 48 | + buf.seek(0) |
| 49 | + |
| 50 | + async with psql_pool.acquire() as connection: |
| 51 | + inserted_rows = await connection.binary_copy_to_table( |
| 52 | + source=buf, |
| 53 | + table_name=table_name, |
| 54 | + ) |
| 55 | + |
| 56 | + expected_inserted_row: typing.Final = 32 |
| 57 | + |
| 58 | + assert inserted_rows == expected_inserted_row |
| 59 | + |
| 60 | + real_table_rows: typing.Final = await psql_pool.execute( |
| 61 | + f"SELECT COUNT(*) AS rows_count FROM {table_name}", |
| 62 | + ) |
| 63 | + assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row |
| 64 | + |
| 65 | + |
| 66 | +async def test_binary_copy_to_table_in_transaction( |
| 67 | + psql_pool: ConnectionPool, |
| 68 | +) -> None: |
| 69 | + """Test binary copy in transaction.""" |
| 70 | + table_name: typing.Final = "cars" |
| 71 | + await psql_pool.execute(f"DROP TABLE IF EXISTS {table_name}") |
| 72 | + await psql_pool.execute( |
| 73 | + """ |
| 74 | +CREATE TABLE IF NOT EXISTS cars ( |
| 75 | + model VARCHAR, |
| 76 | + mpg FLOAT8, |
| 77 | + cyl INTEGER, |
| 78 | + disp FLOAT8, |
| 79 | + hp INTEGER, |
| 80 | + drat FLOAT8, |
| 81 | + wt FLOAT8, |
| 82 | + qsec FLOAT8, |
| 83 | + vs INTEGER, |
| 84 | + am INTEGER, |
| 85 | + gear INTEGER, |
| 86 | + carb INTEGER |
| 87 | +); |
| 88 | +""", |
| 89 | + ) |
| 90 | + |
| 91 | + arrow_table = parquet.read_table( |
| 92 | + f"{os.path.dirname(os.path.abspath(__file__))}/test_data/MTcars.parquet", # noqa: PTH120, PTH100 |
| 93 | + ) |
| 94 | + encoder = ArrowToPostgresBinaryEncoder(arrow_table.schema) |
| 95 | + buf = BytesIO() |
| 96 | + buf.write(encoder.write_header()) |
| 97 | + for batch in arrow_table.to_batches(): |
| 98 | + buf.write(encoder.write_batch(batch)) |
| 99 | + buf.write(encoder.finish()) |
| 100 | + buf.seek(0) |
| 101 | + |
| 102 | + async with psql_pool.acquire() as connection: |
| 103 | + inserted_rows = await connection.binary_copy_to_table( |
| 104 | + source=buf, |
| 105 | + table_name=table_name, |
| 106 | + ) |
| 107 | + |
| 108 | + expected_inserted_row: typing.Final = 32 |
| 109 | + |
| 110 | + assert inserted_rows == expected_inserted_row |
| 111 | + |
| 112 | + real_table_rows: typing.Final = await psql_pool.execute( |
| 113 | + f"SELECT COUNT(*) AS rows_count FROM {table_name}", |
| 114 | + ) |
| 115 | + assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row |
0 commit comments