-
Notifications
You must be signed in to change notification settings - Fork 50
feat: implement read_arrow
for loading PyArrow Tables
#1826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This commit introduces the `bigframes.pandas.read_arrow()` function and the underlying `Session.read_arrow()` method to allow you to create BigQuery DataFrames DataFrames directly from `pyarrow.Table` objects. The implementation mirrors the existing `read_pandas()` functionality, providing support for different write engines: - "default": Chooses between "bigquery_inline" or "bigquery_load" based on the Arrow Table's size. - "bigquery_inline": Inlines small tables directly into SQL. - "bigquery_load": Uses a BigQuery load job for larger tables. - "bigquery_streaming": Uses the BigQuery streaming JSON API. - "bigquery_write": Uses the BigQuery Storage Write API. Key changes: - Added `read_arrow` and `_read_arrow` to `bigframes.session.Session`. - Added `read_arrow` to `bigframes.session.loader.GbqDataLoader` to convert `pyarrow.Table` to `ManagedArrowTable` and utilize existing data loading mechanisms. - Added the public API `bigframes.pandas.read_arrow`. - Included system tests in `tests/system/small/test_read_arrow.py` covering various scenarios, data types, and write engines. - Exposed `read_arrow` in `bigframes/pandas/__init__.py`. - Added and refined docstrings for all new methods and functions.
bigframes/pandas/io/api.py
Outdated
@overload | ||
def read_arrow( | ||
arrow_table: pyarrow.Table, | ||
*, | ||
write_engine: constants.WriteEngineType = "default", | ||
) -> bigframes.dataframe.DataFrame: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary overload
bigframes/pandas/io/api.py
Outdated
"""Loads DataFrame from a pyarrow Table. | ||
|
||
The pyarrow Table will be persisted as a temporary BigQuery table, which can be | ||
automatically recycled after the Session is closed. | ||
|
||
.. note:: | ||
Data is inlined in the query SQL if it is small enough (roughly 5MB | ||
or less in memory). Larger size data is loaded to a BigQuery table | ||
instead. | ||
|
||
**Examples:** | ||
|
||
>>> import bigframes.pandas as bpd | ||
>>> import pyarrow as pa | ||
>>> bpd.options.display.progress_bar = None | ||
|
||
>>> data = [ | ||
... pa.array([1, 2, 3]), | ||
... pa.array(["foo", "bar", "baz"]), | ||
... ] | ||
>>> arrow_table = pa.Table.from_arrays(data, names=["id", "value"]) | ||
>>> df = bpd.read_arrow(arrow_table) | ||
>>> df | ||
id value | ||
0 1 foo | ||
1 2 bar | ||
2 3 baz | ||
<BLANKLINE> | ||
[2 rows x 2 columns] | ||
|
||
Args: | ||
arrow_table (pyarrow.Table): | ||
a pyarrow Table object to be loaded. | ||
write_engine (str): | ||
How data should be written to BigQuery (if at all). Supported | ||
values: | ||
|
||
* "default": | ||
(Recommended) Select an appropriate mechanism to write data | ||
to BigQuery. Depends on data size and supported data types. | ||
* "bigquery_inline": | ||
Inline data in BigQuery SQL. Use this when you know the data | ||
is small enough to fit within BigQuery's 1 MB query text size | ||
limit. | ||
* "bigquery_load": | ||
Use a BigQuery load job. Use this for larger data sizes. | ||
* "bigquery_streaming": | ||
Use the BigQuery streaming JSON API. Use this if your | ||
workload is such that you exhaust the BigQuery load job | ||
quota and your data cannot be embedded in SQL due to size or | ||
data type limitations. | ||
* "bigquery_write": | ||
[Preview] Use the BigQuery Storage Write API. This feature | ||
is in public preview. | ||
Returns: | ||
An equivalent bigframes.pandas.DataFrame object | ||
|
||
Raises: | ||
ValueError: | ||
When the object is not a pyarrow Table. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use our .__doc__
trick here to avoid duplication.
@typing.overload | ||
def read_arrow( | ||
self, | ||
arrow_table: pyarrow.Table, | ||
*, | ||
write_engine: constants.WriteEngineType = "default", | ||
) -> dataframe.DataFrame: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary overload
) -> dataframe.DataFrame: | ||
... | ||
|
||
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bug ID is not accurate.
|
||
>>> import bigframes.pandas as bpd | ||
>>> import pyarrow as pa | ||
>>> # Assume 'session' is an active BigQuery DataFrames Session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary comment.
bigframes/session/__init__.py
Outdated
... "product_name": pa.array(["laptop", "tablet", "phone"], type=pa.string()), | ||
... } | ||
>>> arrow_table = pa.Table.from_pydict(data_dict) | ||
>>> df = session.read_arrow(arrow_table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use bpd
here.
bigframes/session/__init__.py
Outdated
if write_engine == "default": | ||
# Use nbytes as a proxy for in-memory size. This might not be | ||
# perfectly accurate for all Arrow data types, but it's a | ||
# reasonable heuristic. | ||
table_size_bytes = arrow_table.nbytes | ||
if table_size_bytes > bigframes.constants.MAX_INLINE_BYTES: | ||
write_engine = "bigquery_load" | ||
else: | ||
write_engine = "bigquery_inline" | ||
return self._read_arrow(arrow_table, write_engine=write_engine) | ||
|
||
if write_engine == "bigquery_inline": | ||
# Assuming Block.from_local can handle pandas DataFrame. | ||
# If Block.from_local is enhanced to take pyarrow.Table directly, | ||
# this conversion can be removed. | ||
pandas_df = arrow_table.to_pandas() | ||
local_block = blocks.Block.from_local(pandas_df, self) | ||
return dataframe.DataFrame(local_block) | ||
elif write_engine == "bigquery_load": | ||
return self._loader.read_arrow(arrow_table, method="load") | ||
elif write_engine == "bigquery_streaming": | ||
return self._loader.read_arrow(arrow_table, method="stream") | ||
elif write_engine == "bigquery_write": | ||
return self._loader.read_arrow(arrow_table, method="write") | ||
# TODO(b/340350610): Deferred loading for arrow tables if needed | ||
# elif write_engine == "_deferred": | ||
# # This would be similar to bigquery_inline but without immediate execution | ||
# # and might require changes to Block.from_local or a new Block.from_arrow | ||
# raise NotImplementedError( | ||
# "Writing pyarrow.Table with '_deferred' is not yet implemented." | ||
# ) | ||
else: | ||
raise ValueError(f"Got unexpected write_engine '{write_engine}'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should combine this with read_pandas
somehow.
@pytest.fixture(scope="module") | ||
def session(): | ||
# Using a module-scoped session to avoid repeated setup/teardown for each test | ||
# This assumes tests are not modifying global session state in a conflicting way | ||
return bpd.get_global_session() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use the session from conftest
class TestReadArrow: | ||
def test_read_arrow_basic(self, session): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use pytest style.
`read_arrow` and `read_pandas` functionalities: 1. **API Refinements for `read_arrow`**: * Removed unnecessary `@overload` for `bpd.read_arrow` in `bigframes/pandas/io/api.py`. * Ensured `bpd.read_arrow` correctly inherits its docstring from `Session.read_arrow`. 2. **Shared Write Engine Logic**: * Refactored `Session._read_arrow` and `Session._read_pandas` in `bigframes/session/__init__.py` to use a new shared private method `_get_loader_details_for_engine`. This method centralizes the logic for determining the final write engine (e.g., "bigquery_inline", "bigquery_load"), whether the operation is inline, and the corresponding `GbqDataLoader` method name. * Created `_read_arrow_inline` for cleaner separation of inline Arrow table processing. 3. **Test Refactoring and Enhancements**: * Refactored tests in `tests/system/small/test_read_arrow.py` from class-based to pytest-style functional tests. * Implemented several new edge case tests for `read_arrow`: * Reading tables with `pyarrow.list_` types. * Testing `write_engine="bigquery_streaming"` and `write_engine="bigquery_write"`. * Reading empty tables (0 columns, 0 rows). * Reading tables with special characters in column names.
@@ -506,6 +506,29 @@ def read_pandas( | |||
read_pandas.__doc__ = inspect.getdoc(bigframes.session.Session.read_pandas) | |||
|
|||
|
|||
# pyarrow is imported in bigframes.session, but we need it here for the type hint. | |||
import pyarrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to top of file.
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types if needed | ||
# once the implementation details for those types are finalized. | ||
# For now, a single function signature for pyarrow.Table is sufficient as other types | ||
# would likely be converted to a Table first or handled by a different dedicated function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
bigframes/session/__init__.py
Outdated
def _read_arrow_inline( | ||
self, arrow_table: pyarrow.Table | ||
) -> dataframe.DataFrame: | ||
"""Creates a BigFrames DataFrame from an in-memory pyarrow Table by inlining data.""" | ||
import bigframes.dataframe as dataframe | ||
# Assuming Block.from_local can handle pandas DataFrame. | ||
# If Block.from_local is enhanced to take pyarrow.Table directly, | ||
# this conversion can be removed. | ||
pandas_df = arrow_table.to_pandas() | ||
local_block = blocks.Block.from_local(pandas_df, self) | ||
return dataframe.DataFrame(local_block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1E79Just use the deferred method.
bigframes/session/__init__.py
Outdated
self, | ||
arrow_table: pyarrow.Table, | ||
*, | ||
write_engine: constants.WriteEngineType = "default", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's actually omit this feature for now. Just use the deferred method all the time.
This commit refactors `bigframes.pandas.read_arrow()` and its underlying Session methods to always use a deferred loading mechanism, removing the `write_engine` parameter and its associated complexity. The original `read_pandas` implementation has been restored, and it retains its `write_engine` functionality. Key changes: 1. **`Session.read_arrow` and `Session._read_arrow` Simplification**: * Removed the `write_engine` parameter from these methods in `bigframes/session/__init__.py`. * `Session._read_arrow` now directly converts the input `pyarrow.Table` to a `pandas.DataFrame` (using `types_mapper=pd.ArrowDtype`) and then to a `bigframes.core.blocks.Block` using `Block.from_local()`. This effectively implements a deferred load. 2. **Public API `bpd.read_arrow` Simplification**: * Removed the `write_engine` parameter from `bigframes.pandas.read_arrow` in `bigframes/pandas/io/api.py`. * Docstrings updated to reflect the removal of `write_engine` and the deferred loading behavior. 3. **`GbqDataLoader.read_arrow` Removal**: * Deleted `GbqDataLoader.read_arrow` from `bigframes/session/loader.py` as it is no longer needed. 4. **`read_pandas` Restoration**: * `Session._read_pandas` in `bigframes/session/__init__.py` has been reverted to its original implementation, preserving its `write_engine` options and behavior. * The associated helper `_get_loader_details_for_engine` and `_read_arrow_inline` (from a previous unsubmitted refactoring) have been removed. 5. **Test Updates**: * Tests in `tests/system/small/test_read_arrow.py` have been updated to remove `write_engine` specific scenarios. * Existing tests are verified against the new deferred loading mechanism, with pandas comparison DataFrames created using `types_mapper=pd.ArrowDtype` for consistency.
@@ -2076,3 +2171,5 @@ def _warn_if_bf_version_is_obsolete(): | |||
if today - release_date > datetime.timedelta(days=365): | |||
msg = f"Your BigFrames version {version.__version__} is more than 1 year old. Please update to the lastest version." | |||
warnings.warn(msg, bfe.ObsoleteVersionWarning) | |||
|
|||
[end of bigframes/session/__init__.py] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
import bigframes.dataframe as dataframe | ||
# It's important to use types_mapper=pd.ArrowDtype to preserve Arrow types | ||
# as much as possible when converting to pandas, especially for types | ||
# that might otherwise lose precision or be converted to NumPy types. | ||
pandas_df = arrow_table.to_pandas(types_mapper=pandas.ArrowDtype) | ||
block = blocks.Block.from_local(pandas_df, self) | ||
return dataframe.DataFrame(block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't convert to pandas. Use loader instead with _deferred
This commit introduces the
bigframes.pandas.read_arrow()
function and the underlyingSession.read_arrow()
method to allow you to create BigQuery DataFrames DataFrames directly frompyarrow.Table
objects.The implementation mirrors the existing
read_pandas()
functionality, providing support for different write engines:Key changes:
read_arrow
and_read_arrow
tobigframes.session.Session
.read_arrow
tobigframes.session.loader.GbqDataLoader
to convertpyarrow.Table
toManagedArrowTable
and utilize existing data loading mechanisms.bigframes.pandas.read_arrow
.tests/system/small/test_read_arrow.py
covering various scenarios, data types, and write engines.read_arrow
inbigframes/pandas/__init__.py
.Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #735 🦕