8000 feat: implement `read_arrow` for loading PyArrow Tables by tswast · Pull Request #1826 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content

feat: implement read_arrow for loading PyArrow Tables #1826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

tswast
8000
Copy link
Collaborator
@tswast tswast commented Jun 16, 2025

This commit introduces the bigframes.pandas.read_arrow() function and the underlying Session.read_arrow() method to allow you to create BigQuery DataFrames DataFrames directly from pyarrow.Table objects.

The implementation mirrors the existing read_pandas() functionality, providing support for different write engines:

  • "default": Chooses between "bigquery_inline" or "bigquery_load" based on the Arrow Table's size.
  • "bigquery_inline": Inlines small tables directly into SQL.
  • "bigquery_load": Uses a BigQuery load job for larger tables.
  • "bigquery_streaming": Uses the BigQuery streaming JSON API.
  • "bigquery_write": Uses the BigQuery Storage Write API.

Key changes:

  • Added read_arrow and _read_arrow to bigframes.session.Session.
  • Added read_arrow to bigframes.session.loader.GbqDataLoader to convert pyarrow.Table to ManagedArrowTable and utilize existing data loading mechanisms.
  • Added the public API bigframes.pandas.read_arrow.
  • Included system tests in tests/system/small/test_read_arrow.py covering various scenarios, data types, and write engines.
  • Exposed read_arrow in bigframes/pandas/__init__.py.
  • Added and refined docstrings for all new methods and functions.

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #735 🦕

This commit introduces the `bigframes.pandas.read_arrow()` function and
the underlying `Session.read_arrow()` method to allow you to create
BigQuery DataFrames DataFrames directly from `pyarrow.Table` objects.

The implementation mirrors the existing `read_pandas()` functionality,
providing support for different write engines:
- "default": Chooses between "bigquery_inline" or "bigquery_load" based
  on the Arrow Table's size.
- "bigquery_inline": Inlines small tables directly into SQL.
- "bigquery_load": Uses a BigQuery load job for larger tables.
- "bigquery_streaming": Uses the BigQuery streaming JSON API.
- "bigquery_write": Uses the BigQuery Storage Write API.

Key changes:
- Added `read_arrow` and `_read_arrow` to `bigframes.session.Session`.
- Added `read_arrow` to `bigframes.session.loader.GbqDataLoader` to
  convert `pyarrow.Table` to `ManagedArrowTable` and utilize existing
  data loading mechanisms.
- Added the public API `bigframes.pandas.read_arrow`.
- Included system tests in `tests/system/small/test_read_arrow.py`
  covering various scenarios, data types, and write engines.
- Exposed `read_arrow` in `bigframes/pandas/__init__.py`.
- Added and refined docstrings for all new methods and functions.
@tswast tswast requested review from a team as code owners June 16, 2025 20:39
@tswast tswast requested a review from shobsi June 16, 2025 20:39
@product-auto-label product-auto-label bot added the size: l Pull request size is large. label Jun 16, 2025
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery-dataframes API. label Jun 16, 2025
Comment on lines 509 to 515
@overload
def read_arrow(
arrow_table: pyarrow.Table,
*,
write_engine: constants.WriteEngineType = "default",
) -> bigframes.dataframe.DataFrame:
...
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary overload

Comment on lines 524 to 584
"""Loads DataFrame from a pyarrow Table.

The pyarrow Table will be persisted as a temporary BigQuery table, which can be
automatically recycled after the Session is closed.

.. note::
Data is inlined in the query SQL if it is small enough (roughly 5MB
or less in memory). Larger size data is loaded to a BigQuery table
instead.

**Examples:**

>>> import bigframes.pandas as bpd
>>> import pyarrow as pa
>>> bpd.options.display.progress_bar = None

>>> data = [
... pa.array([1, 2, 3]),
... pa.array(["foo", "bar", "baz"]),
... ]
>>> arrow_table = pa.Table.from_arrays(data, names=["id", "value"])
>>> df = bpd.read_arrow(arrow_table)
>>> df
id value
0 1 foo
1 2 bar
2 3 baz
<BLANKLINE>
[2 rows x 2 columns]

Args:
arrow_table (pyarrow.Table):
a pyarrow Table object to be loaded.
write_engine (str):
How data should be written to BigQuery (if at all). Supported
values:

* "default":
(Recommended) Select an appropriate mechanism to write data
to BigQuery. Depends on data size and supported data types.
* "bigquery_inline":
Inline data in BigQuery SQL. Use this when you know the data
is small enough to fit within BigQuery's 1 MB query text size
limit.
* "bigquery_load":
Use a BigQuery load job. Use this for larger data sizes.
* "bigquery_streaming":
Use the BigQuery streaming JSON API. Use this if your
workload is such that you exhaust the BigQuery load job
quota and your data cannot be embedded in SQL due to size or
data type limitations.
* "bigquery_write":
[Preview] Use the BigQuery Storage Write API. This feature
is in public preview.
Returns:
An equivalent bigframes.pandas.DataFrame object

Raises:
ValueError:
When the object is not a pyarrow Table.
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use our .__doc__ trick here to avoid duplication.

Comment on lines +920 to +927
@typing.overload
def read_arrow(
self,
arrow_table: pyarrow.Table,
*,
write_engine: constants.WriteEngineType = "default",
) -> dataframe.DataFrame:
...
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary overload

) -> dataframe.DataFrame:
...

# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug ID is not accurate.


>>> import bigframes.pandas as bpd
>>> import pyarrow as pa
>>> # Assume 'session' is an active BigQuery DataFrames Session
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary comment.

... "product_name": pa.array(["laptop", "tablet", "phone"], type=pa.string()),
... }
>>> arrow_table = pa.Table.from_pydict(data_dict)
>>> df = session.read_arrow(arrow_table)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use bpd here.

Comment on lines 1094 to 1126
if write_engine == "default":
# Use nbytes as a proxy for in-memory size. This might not be
# perfectly accurate for all Arrow data types, but it's a
# reasonable heuristic.
table_size_bytes = arrow_table.nbytes
if table_size_bytes > bigframes.constants.MAX_INLINE_BYTES:
write_engine = "bigquery_load"
else:
write_engine = "bigquery_inline"
return self._read_arrow(arrow_table, write_engine=write_engine)

if write_engine == "bigquery_inline":
# Assuming Block.from_local can handle pandas DataFrame.
# If Block.from_local is enhanced to take pyarrow.Table directly,
# this conversion can be removed.
pandas_df = arrow_table.to_pandas()
local_block = blocks.Block.from_local(pandas_df, self)
return dataframe.DataFrame(local_block)
elif write_engine == "bigquery_load":
return self._loader.read_arrow(arrow_table, method="load")
elif write_engine == "bigquery_streaming":
return self._loader.read_arrow(arrow_table, method="stream")
elif write_engine == "bigquery_write":
return self._loader.read_arrow(arrow_table, method="write")
# TODO(b/340350610): Deferred loading for arrow tables if needed
# elif write_engine == "_deferred":
# # This would be similar to bigquery_inline but without immediate execution
# # and might require changes to Block.from_local or a new Block.from_arrow
# raise NotImplementedError(
# "Writing pyarrow.Table with '_deferred' is not yet implemented."
# )
else:
raise ValueError(f"Got unexpected write_engine '{write_engine}'")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should combine this with read_pandas somehow.

Comment on lines +24 to +28
@pytest.fixture(scope="module")
def session():
# Using a module-scoped session to avoid repeated setup/teardown for each test
# This assumes tests are not modifying global session state in a conflicting way
return bpd.get_global_session()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use the session from conftest

Comment on lines 31 to 32
class TestReadArrow:
def test_read_arrow_basic(self, session):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use pytest style.

google-labs-jules bot and others added 2 commits June 16, 2025 21:04
`read_arrow` and `read_pandas` functionalities:

1.  **API Refinements for `read_arrow`**:
    *   Removed unnecessary `@overload` for `bpd.read_arrow` in
        `bigframes/pandas/io/api.py`.
    *   Ensured `bpd.read_arrow` correctly inherits its docstring from
        `Session.read_arrow`.

2.  **Shared Write Engine Logic**:
    *   Refactored `Session._read_arrow` and `Session._read_pandas` in
        `bigframes/session/__init__.py` to use a new shared private method
        `_get_loader_details_for_engine`. This method centralizes the logic
        for determining the final write engine (e.g., "bigquery_inline",
        "bigquery_load"), whether the operation is inline, and the
        corresponding `GbqDataLoader` method name.
    *   Created `_read_arrow_inline` for cleaner separation of inline Arrow
        table processing.

3.  **Test Refactoring and Enhancements**:
    *   Refactored tests in `tests/system/small/test_read_arrow.py` from
        class-based to pytest-style functional tests.
    *   Implemented several new edge case tests for `read_arrow`:
        *   Reading tables with `pyarrow.list_` types.
        *   Testing `write_engine="bigquery_streaming"` and
            `write_engine="bigquery_write"`.
        *   Reading empty tables (0 columns, 0 rows).
        *   Reading tables with special characters in column names.
@@ -506,6 +506,29 @@ def read_pandas(
read_pandas.__doc__ = inspect.getdoc(bigframes.session.Session.read_pandas)


# pyarrow is imported in bigframes.session, but we need it here for the type hint.
import pyarrow
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to top of file.

Comment on lines +513 to +516
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types if needed
# once the implementation details for those types are finalized.
# For now, a single function signature for pyarrow.Table is sufficient as other types
# would likely be converted to a Table first or handled by a different dedicated function.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Comment on lines 1061 to 1071
def _read_arrow_inline(
self, arrow_table: pyarrow.Table
) -> dataframe.DataFrame:
"""Creates a BigFrames DataFrame from an in-memory pyarrow Table by inlining data."""
import bigframes.dataframe as dataframe
# Assuming Block.from_local can handle pandas DataFrame.
# If Block.from_local is enhanced to take pyarrow.Table directly,
# this conversion can be removed.
pandas_df = arrow_table.to_pandas()
local_block = blocks.Block.from_local(pandas_df, self)
return dataframe.DataFrame(local_block)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1E79

Just use the deferred method.

self,
arrow_table: pyarrow.Table,
*,
write_engine: constants.WriteEngineType = "default",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's actually omit this feature for now. Just use the deferred method all the time.

This commit refactors `bigframes.pandas.read_arrow()` and its underlying
Session methods to always use a deferred loading mechanism, removing the
`write_engine` parameter and its associated complexity.

The original `read_pandas` implementation has been restored, and it
retains its `write_engine` functionality.

Key changes:

1.  **`Session.read_arrow` and `Session._read_arrow` Simplification**:
    *   Removed the `write_engine` parameter from these methods in
        `bigframes/session/__init__.py`.
    *   `Session._read_arrow` now directly converts the input
        `pyarrow.Table` to a `pandas.DataFrame` (using
        `types_mapper=pd.ArrowDtype`) and then to a
        `bigframes.core.blocks.Block` using `Block.from_local()`. This
        effectively implements a deferred load.

2.  **Public API `bpd.read_arrow` Simplification**:
    *   Removed the `write_engine` parameter from
        `bigframes.pandas.read_arrow` in `bigframes/pandas/io/api.py`.
    *   Docstrings updated to reflect the removal of `write_engine` and
        the deferred loading behavior.

3.  **`GbqDataLoader.read_arrow` Removal**:
    *   Deleted `GbqDataLoader.read_arrow` from
        `bigframes/session/loader.py` as it is no longer needed.

4.  **`read_pandas` Restoration**:
    *   `Session._read_pandas` in `bigframes/session/__init__.py` has been
        reverted to its original implementation, preserving its
        `write_engine` options and behavior.
    *   The associated helper `_get_loader_details_for_engine` and
        `_read_arrow_inline` (from a previous unsubmitted refactoring)
        have been removed.

5.  **Test Updates**:
    *   Tests in `tests/system/small/test_read_arrow.py` have been
        updated to remove `write_engine` specific scenarios.
    *   Existing tests are verified against the new deferred loading
        mechanism, with pandas comparison DataFrames created using
        `types_mapper=pd.ArrowDtype` for consistency.
@@ -2076,3 +2171,5 @@ def _warn_if_bf_version_is_obsolete():
if today - release_date > datetime.timedelta(days=365):
msg = f"Your BigFrames version {version.__version__} is more than 1 year old. Please update to the lastest version."
warnings.warn(msg, bfe.ObsoleteVersionWarning)

[end of bigframes/session/__init__.py]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Comment on lines +1056 to +1062
import bigframes.dataframe as dataframe
# It's important to use types_mapper=pd.ArrowDtype to preserve Arrow types
# as much as possible when converting to pandas, especially for types
# that might otherwise lose precision or be converted to NumPy types.
pandas_df = arrow_table.to_pandas(types_mapper=pandas.ArrowDtype)
block = blocks.Block.from_local(pandas_df, self)
return dataframe.DataFrame(block)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't convert to pandas. Use loader instead with _deferred

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery-dataframes API. size: l Pull request size is large.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Polars Support
2 participants
0