8000 feat: implement `read_arrow` for loading PyArrow Tables by tswast · Pull Request #1826 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content

feat: implement read_arrow for loading PyArrow Tables #1826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
read_pandas,
read_parquet,
read_pickle,
read_arrow,
)
import bigframes.series
import bigframes.session
Expand Down Expand Up @@ -344,6 +345,7 @@ def reset_session():
read_pandas,
read_parquet,
read_pickle,
read_arrow,
remote_function,
to_datetime,
to_timedelta,
Expand Down
20 changes: 20 additions & 0 deletions bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ def read_pandas(
read_pandas.__doc__ = inspect.getdoc(bigframes.session.Session.read_pandas)


# pyarrow is imported in bigframes.session, but we need it here for the type hint.
import pyarrow
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to top of file.



# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types if needed
# once the implementation details for those types are finalized.
# For now, a single function signature for pyarrow.Table is sufficient as other types
# would likely be converted to a Table first or handled by a different dedicated function.
Comment on lines +513 to +516
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

def read_arrow(
arrow_table: pyarrow.Table,
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_arrow,
arrow_table,
)


read_arrow.__doc__ = inspect.getdoc(bigframes.session.Session.read_arrow)


def read_pickle(
filepath_or_buffer: FilePath | ReadPickleBuffer,
compression: CompressionOptions = "infer",
Expand Down
97 changes: 97 additions & 0 deletions bigframes/session/__init__.py
< 57AE td id="diff-ca8b88da6ee41672b3d377eb32f9b63ab34a24ec18815c144d4667319ceff6f4R959" data-line-number="959" class="blob-num blob-num-addition js-linkable-line-number js-blob-rnum">
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
ReadPickleBuffer,
StorageOptions,
)
import pyarrow

from bigframes import exceptions as bfe
from bigframes import version
Expand Down Expand Up @@ -916,6 +917,71 @@ def read_pandas(
f"read_pandas() expects a pandas.DataFrame, but got a {type(pandas_dataframe)}"
)

@typing.overload
def read_arrow(
self,
arrow_table: pyarrow.Table,
*,
write_engine: constants.WriteEngineType = "default",
) -> dataframe.DataFrame:
...
Comment on lines +920 to +927
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary overload


# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug ID is not accurate.

def read_arrow(
self,
arrow_table: pyarrow.Table,
) -> dataframe.DataFrame:
"""Loads a BigQuery DataFrames DataFrame from a ``pyarrow.Table`` object.

This method uses a deferred loading mechanism: the ``pyarrow.Table`` data
is kept in memory locally and converted to a BigFrames DataFrame
representation without immediate BigQuery table materialization.
Actual computation or data transfer to BigQuery is deferred until an
action requiring remote execution is triggered on the DataFrame.

This is the primary session-level API for reading Arrow tables and is
called by :func:`bigframes.pandas.read_arrow`.

**Examples:**

>>> import bigframes.pandas as bpd
>>> import pyarrow as pa
>>> # Assume 'session' is an active BigQuery DataFrames Session
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary comment.


>>> data_dict = {
... "id": pa.array([1, 2, 3], type=pa.int64()),
... "product_name": pa.array(["laptop", "tablet", "phone"], type=pa.string()),
... }
>>> arrow_table = pa.Table.from_pydict(data_dict)
>>> bf_df = session.read_arrow(arrow_table)
>>> bf_df
id product_name
0 1 laptop
1 2 tablet
2 3 phone
<BLANKLINE>
[3 rows x 2 columns]

Args:
arrow_table (pyarrow.Table):
The ``pyarrow.Table`` object to load.

Returns:
bigframes.dataframe.DataFrame:
A new BigQuery DataFrames DataFrame representing the data from the
input ``pyarrow.Table``.

Raises:
ValueError:
If the input object is not a ``pyarrow.Table``.
"""
if isinstance(arrow_table, pyarrow.Table):
return self._read_arrow(arrow_table)
else:
raise ValueError(
f"read_arrow() expects a pyarrow.Table, but got a {type(arrow_table)}"
)

def _read_pandas(
self,
pandas_dataframe: pandas.DataFrame,
Expand Down Expand Up @@ -952,6 +1018,7 @@ def _read_pandas(
elif write_engine == "bigquery_write":
return self._loader.read_pandas(pandas_dataframe, method="write")
elif write_engine == "_deferred":
# Must import here to avoid circular dependency from blocks.py
import bigframes.dataframe as dataframe

return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self))
Expand All @@ -961,11 +1028,39 @@ def _read_pandas(
def _read_pandas_inline(
self, pandas_dataframe: pandas.DataFrame
) -> dataframe.DataFrame:
"""Creates a BigFrames DataFrame from an in-memory pandas DataFrame by inlining data."""
import bigframes.dataframe as dataframe

local_block = blocks.Block.from_local(pandas_dataframe, self)
return dataframe.DataFrame(local_block)

def _read_arrow(
self,
arrow_table: pyarrow.Table,
) -> dataframe.DataFrame:
"""Internal helper to load a ``pyarrow.Table`` using a deferred mechanism.

Converts the Arrow table to a pandas DataFrame with ArrowDTypes,
then creates a BigFrames block from this local pandas DataFrame.
The data remains in memory until an operation triggers execution.
Called by the public :meth:`~Session.read_arrow`.

Args:
arrow_table (pyarrow.Table):
The ``pyarrow.Table`` to load.

Returns:
bigframes.dataframe.DataFrame:
A new DataFrame representing the data from the Arrow table.
"""
import bigframes.dataframe as dataframe
# It's important to use types_mapper=pd.ArrowDtype to preserve Arrow types
# as much as possible when converting to pandas, especially for types
# that might otherwise lose precision or be converted to NumPy types.
pandas_df = arrow_table.to_pandas(types_mapper=pandas.ArrowDtype)
block = blocks.Block.from_local(pandas_df, self)
return dataframe.DataFrame(block)
Comment on lines +1056 to +1062
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't convert to pandas. Use loader instead with _deferred


def read_csv(
self,
filepath_or_buffer: str | IO["bytes"],
Expand Down Expand Up @@ -2076,3 +2171,5 @@ def _warn_if_bf_version_is_obsolete():
if today - release_date > datetime.timedelta(days=365):
msg = f"Your BigFrames version {version.__version__} is more than 1 year old. Please update to the lastest version."
warnings.warn(msg, bfe.ObsoleteVersionWarning)

[end of bigframes/session/__init__.py]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

47 changes: 47 additions & 0 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,53 @@ def read_pandas(
)
return dataframe.DataFrame(block)

def read_arrow(
self,
arrow_table: pa.Table,
method: Literal["load", "stream", "write"],
) -> dataframe.DataFrame:
"""Converts a ``pyarrow.Table`` to a ``ManagedArrowTable`` and loads it.

This method is an internal part of the data loading pipeline for Arrow tables,
called by ``Session._read_arrow`` when the ``write_engine`` is
``"bigquery_load"``, ``"bigquery_streaming"``, or ``"bigquery_write"``.
It prepares the Arrow table for BigQuery ingestion by wrapping it in a
:class:`~bigframes.core.local_data.ManagedArrowTable` and then delegates
the actual loading to :meth:`~GbqDataLoader.read_managed_data`.

Args:
arrow_table (pyarrow.Table):
The ``pyarrow.Table`` to be loaded.
method (Literal["load", "stream", "write"]):
The BigQuery ingestion method to be used by
``read_managed_data`` (e.g., "load", "stream", "write"),
corresponding to the ``write_engine`` chosen in the session layer.

Returns:
bigframes.dataframe.DataFrame:
A BigQuery DataFrames DataFrame representing the loaded data.
"""
from bigframes import dataframe
from bigframes.core import blocks
from bigframes.core.local_data import ManagedArrowTable

managed_arrow_table = ManagedArrowTable(arrow_table)
array_value = self.read_managed_data(managed_arrow_table, method=method)

# For Arrow tables, the index is not explicitly part of the table's
# schema in the same way as pandas. We'll create a default index.
# The actual index creation (e.g. sequential) will be handled by
# subsequent operations or session defaults if needed.
# Column labels are taken directly from the Arrow table.
block = blocks.Block(
array_value,
index_columns=[], # Start with no explicit index columns from arrow table
column_labels=arrow_table.column_names,
# index_labels can also be empty or default based on conventions
index_labels=[],
)
return dataframe.DataFrame(block)

def read_managed_data(
self,
data: local_data.ManagedArrowTable,
Expand Down
Loading
Loading
0