-
Notifications
You must be signed in to change notification settings - Fork 50
feat: implement read_arrow
for loading PyArrow Tables
#1826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ba25f6f
1449ef2
7a8e622
9f30c18
5e9c2bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -506,6 +506,26 @@ def read_pandas( | |
read_pandas.__doc__ = inspect.getdoc(bigframes.session.Session.read_pandas) | ||
|
||
|
||
# pyarrow is imported in bigframes.session, but we need it here for the type hint. | ||
import pyarrow | ||
|
||
|
||
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types if needed | ||
# once the implementation details for those types are finalized. | ||
# For now, a single function signature for pyarrow.Table is sufficient as other types | ||
# would likely be converted to a Table first or handled by a different dedicated function. | ||
Comment on lines
+513
to
+516
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove |
||
def read_arrow( | ||
arrow_table: pyarrow.Table, | ||
) -> bigframes.dataframe.DataFrame: | ||
return global_session.with_default_session( | ||
bigframes.session.Session.read_arrow, | ||
arrow_table, | ||
) | ||
|
||
|
||
read_arrow.__doc__ = inspect.getdoc(bigframes.session.Session.read_arrow) | ||
|
||
|
||
def read_pickle( | ||
filepath_or_buffer: FilePath | ReadPickleBuffer, | ||
compression: CompressionOptions = "infer", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ | |
ReadPickleBuffer, | ||
StorageOptions, | ||
) | ||
import pyarrow | ||
|
||
from bigframes import exceptions as bfe | ||
from bigframes import version | ||
|
@@ -916,6 +917,71 @@ def read_pandas( | |
f"read_pandas() expects a pandas.DataFrame, but got a {type(pandas_dataframe)}" | ||
) | ||
|
||
@typing.overload | ||
def read_arrow( | ||
self, | ||
arrow_table: pyarrow.Table, | ||
*, | ||
write_engine: constants.WriteEngineType = "default", | ||
) -> dataframe.DataFrame: | ||
... | ||
Comment on lines
+920
to
+927
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary overload |
||
|
||
# TODO(b/340350610): Add overloads for pyarrow.RecordBatchReader and other arrow types. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This bug ID is not accurate. |
||
def read_arrow( | ||
self, | ||
arrow_table: pyarrow.Table, | ||
) -> dataframe.DataFrame: | ||
"""Loads a BigQuery DataFrames DataFrame from a ``pyarrow.Table`` object. | ||
|
||
This method uses a deferred loading mechanism: the ``pyarrow.Table`` data | ||
is kept in memory locally and converted to a BigFrames DataFrame | ||
representation without immediate BigQuery table materialization. | ||
Actual computation or data transfer to BigQuery is deferred until an | ||
action requiring remote execution is triggered on the DataFrame. | ||
|
||
This is the primary session-level API for reading Arrow tables and is | ||
called by :func:`bigframes.pandas.read_arrow`. | ||
|
||
**Examples:** | ||
|
||
>>> import bigframes.pandas as bpd | ||
>>> import pyarrow as pa | ||
>>> # Assume 'session' is an active BigQuery DataFrames Session | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary comment. |
||
|
||
>>> data_dict = { | ||
... "id": pa.array([1, 2, 3], type=pa.int64()), | ||
... "product_name": pa.array(["laptop", "tablet", "phone"], type=pa.string()), | ||
... } | ||
>>> arrow_table = pa.Table.from_pydict(data_dict) | ||
>>> bf_df = session.read_arrow(arrow_table) | ||
>>> bf_df | ||
id product_name | ||
< 57AE td id="diff-ca8b88da6ee41672b3d377eb32f9b63ab34a24ec18815c144d4667319ceff6f4R959" data-line-number="959" class="blob-num blob-num-addition js-linkable-line-number js-blob-rnum"> | 0 1 laptop | |
1 2 tablet | ||
2 3 phone | ||
<BLANKLINE> | ||
[3 rows x 2 columns] | ||
|
||
Args: | ||
arrow_table (pyarrow.Table): | ||
The ``pyarrow.Table`` object to load. | ||
|
||
Returns: | ||
bigframes.dataframe.DataFrame: | ||
A new BigQuery DataFrames DataFrame representing the data from the | ||
input ``pyarrow.Table``. | ||
|
||
Raises: | ||
ValueError: | ||
If the input object is not a ``pyarrow.Table``. | ||
""" | ||
if isinstance(arrow_table, pyarrow.Table): | ||
return self._read_arrow(arrow_table) | ||
else: | ||
raise ValueError( | ||
f"read_arrow() expects a pyarrow.Table, but got a {type(arrow_table)}" | ||
) | ||
|
||
def _read_pandas( | ||
self, | ||
pandas_dataframe: pandas.DataFrame, | ||
|
@@ -952,6 +1018,7 @@ def _read_pandas( | |
elif write_engine == "bigquery_write": | ||
return self._loader.read_pandas(pandas_dataframe, method="write") | ||
elif write_engine == "_deferred": | ||
# Must import here to avoid circular dependency from blocks.py | ||
import bigframes.dataframe as dataframe | ||
|
||
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self)) | ||
|
@@ -961,11 +1028,39 @@ def _read_pandas( | |
def _read_pandas_inline( | ||
self, pandas_dataframe: pandas.DataFrame | ||
) -> dataframe.DataFrame: | ||
"""Creates a BigFrames DataFrame from an in-memory pandas DataFrame by inlining data.""" | ||
import bigframes.dataframe as dataframe | ||
|
||
local_block = blocks.Block.from_local(pandas_dataframe, self) | ||
return dataframe.DataFrame(local_block) | ||
|
||
def _read_arrow( | ||
self, | ||
arrow_table: pyarrow.Table, | ||
) -> dataframe.DataFrame: | ||
"""Internal helper to load a ``pyarrow.Table`` using a deferred mechanism. | ||
|
||
Converts the Arrow table to a pandas DataFrame with ArrowDTypes, | ||
then creates a BigFrames block from this local pandas DataFrame. | ||
The data remains in memory until an operation triggers execution. | ||
Called by the public :meth:`~Session.read_arrow`. | ||
|
||
Args: | ||
arrow_table (pyarrow.Table): | ||
The ``pyarrow.Table`` to load. | ||
|
||
Returns: | ||
bigframes.dataframe.DataFrame: | ||
A new DataFrame representing the data from the Arrow table. | ||
""" | ||
import bigframes.dataframe as dataframe | ||
# It's important to use types_mapper=pd.ArrowDtype to preserve Arrow types | ||
# as much as possible when converting to pandas, especially for types | ||
# that might otherwise lose precision or be converted to NumPy types. | ||
pandas_df = arrow_table.to_pandas(types_mapper=pandas.ArrowDtype) | ||
block = blocks.Block.from_local(pandas_df, self) | ||
return dataframe.DataFrame(block) | ||
Comment on lines
+1056
to
+1062
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't convert to pandas. Use loader instead with |
||
|
||
def read_csv( | ||
self, | ||
filepath_or_buffer: str | IO["bytes"], | ||
|
@@ -2076,3 +2171,5 @@ def _warn_if_bf_version_is_obsolete(): | |
if today - release_date > datetime.timedelta(days=365): | ||
msg = f"Your BigFrames version {version.__version__} is more than 1 year old. Please update to the lastest version." | ||
warnings.warn(msg, bfe.ObsoleteVersionWarning) | ||
|
||
[end of bigframes/session/__init__.py] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to top of file.