diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 675e8c8b7a..94aa51ec78 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -50,6 +50,7 @@ import bigframes.core.identifiers import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering +import bigframes.core.pyarrow_utils as pyarrow_utils import bigframes.core.schema as bf_schema import bigframes.core.sql as sql import bigframes.core.utils as utils @@ -156,6 +157,36 @@ def __init__( self._view_ref: Optional[bigquery.TableReference] = None self._view_ref_dry_run: Optional[bigquery.TableReference] = None + @classmethod + def from_pyarrow( + cls, + data: pa.Table, + session: bigframes.Session, + ) -> Block: + column_labels = data.column_names + + # TODO(tswast): Use array_value.promote_offsets() instead once that node is + # supported by the local engine. + offsets_col = bigframes.core.guid.generate_guid() + index_ids = [offsets_col] + index_labels = [None] + + # TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859): + # Allow users to specify the "total ordering" column(s) or allow multiple + # such columns. + data = pyarrow_utils.append_offsets(data, offsets_col=offsets_col) + + # from_pyarrow will normalize the types for us. + managed_data = local_data.ManagedArrowTable.from_pyarrow(data) + array_value = core.ArrayValue.from_managed(managed_data, session=session) + block = cls( + array_value, + column_labels=column_labels, + index_columns=index_ids, + index_labels=index_labels, + ) + return block + @classmethod def from_local( cls, diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index ed999e62c1..f163d25757 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -40,6 +40,7 @@ from bigframes.pandas.io.api import ( _read_gbq_colab, from_glob_path, + read_arrow, read_csv, read_gbq, read_gbq_function, @@ -367,6 +368,7 @@ def reset_session(): merge, qcut, read_csv, + read_arrow, read_gbq, _read_gbq_colab, read_gbq_function, diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 608eaf5a82..2aebf59ccb 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -44,6 +44,7 @@ ReadPickleBuffer, StorageOptions, ) +import pyarrow as pa import bigframes._config as config import bigframes.core.global_session as global_session @@ -72,6 +73,21 @@ # method and its arguments. +def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame: + """Load a PyArrow Table to a BigQuery DataFrames DataFrame. + + Args: + pa_table (pyarrow.Table): + PyArrow table to load data from. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + session = global_session.get_global_session() + return session.read_arrow(pa_table=pa_table) + + def read_csv( filepath_or_buffer: str | IO["bytes"], *, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 8cbcf8612e..9d113743cf 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -55,12 +55,14 @@ ReadPickleBuffer, StorageOptions, ) +import pyarrow as pa from bigframes import exceptions as bfe from bigframes import version import bigframes._config.bigquery_options as bigquery_options import bigframes.clients import bigframes.constants +import bigframes.core from bigframes.core import blocks, log_adapter, utils import bigframes.core.pyformat @@ -967,6 +969,22 @@ def _read_pandas_inline( local_block = blocks.Block.from_local(pandas_dataframe, self) return dataframe.DataFrame(local_block) + def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame: + """Load a PyArrow Table to a BigQuery DataFrames DataFrame. + + Args: + pa_table (pyarrow.Table): + PyArrow table to load data from. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the PyArrow table. + """ + import bigframes.dataframe as dataframe + + local_block = blocks.Block.from_pyarrow(pa_table, self) + return dataframe.DataFrame(local_block) + def read_csv( self, filepath_or_buffer: str | IO["bytes"], diff --git a/tests/unit/session/test_io_arrow.py b/tests/unit/session/test_io_arrow.py new file mode 100644 index 0000000000..d5266220d9 --- /dev/null +++ b/tests/unit/session/test_io_arrow.py @@ -0,0 +1,133 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import pyarrow as pa +import pytest + +import bigframes.pandas as bpd +from bigframes.testing import mocks + + +@pytest.fixture(scope="module") +def session(): + # Use the mock session from bigframes.testing + return mocks.create_bigquery_session() + + +def test_read_arrow_empty_table(session): + empty_table = pa.Table.from_pydict( + { + "col_a": pa.array([], type=pa.int64()), + "col_b": pa.array([], type=pa.string()), + } + ) + df = session.read_arrow(empty_table) + assert isinstance(df, bpd.DataFrame) + assert df.shape == (0, 2) + assert list(df.columns) == ["col_a", "col_b"] + pd_df = df.to_pandas() + assert pd_df.empty + assert list(pd_df.columns) == ["col_a", "col_b"] + assert pd_df["col_a"].dtype == "Int64" + assert pd_df["col_b"].dtype == "string[pyarrow]" + + +@pytest.mark.parametrize( + "data,arrow_type,expected_bq_type_kind", + [ + ([1, 2], pa.int8(), "INTEGER"), + ([1, 2], pa.int16(), "INTEGER"), + ([1, 2], pa.int32(), "INTEGER"), + ([1, 2], pa.int64(), "INTEGER"), + ([1.0, 2.0], pa.float32(), "FLOAT"), + ([1.0, 2.0], pa.float64(), "FLOAT"), + ([True, False], pa.bool_(), "BOOLEAN"), + (["a", "b"], pa.string(), "STRING"), + (["a", "b"], pa.large_string(), "STRING"), + ([b"a", b"b"], pa.binary(), "BYTES"), + ([b"a", b"b"], pa.large_binary(), "BYTES"), + ( + [ + pa.scalar(1000, type=pa.duration("s")), + pa.scalar(2000, type=pa.duration("s")), + ], + pa.duration("s"), + "INTEGER", + ), + ([datetime.date(2023, 1, 1)], pa.date32(), "DATE"), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("s", tz="UTC"), + "TIMESTAMP", + ), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("ms", tz="UTC"), + "TIMESTAMP", + ), + ( + [datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)], + pa.timestamp("us", tz="UTC"), + "TIMESTAMP", + ), + ([datetime.time(12, 34, 56, 789000)], pa.time64("us"), "TIME"), + ], +) +def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind): + """ + Tests that various arrow types are mapped to the expected BigQuery types. + This is an indirect check via the resulting DataFrame's schema. + """ + pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"]) + df = session.read_arrow(pa_table) + + bigquery_schema = df._block.expr.schema.to_bigquery() + assert len(bigquery_schema) == 2 # offsets + value + field = bigquery_schema[-1] + assert field.field_type.upper() == expected_bq_type_kind + + # Also check pandas dtype after conversion for good measure + pd_df = df.to_pandas() + assert pd_df["col"].shape == (len(data),) + + +def test_read_arrow_list_type(session): + pa_table = pa.Table.from_arrays( + [pa.array([[1, 2], [3, 4, 5]], type=pa.list_(pa.int64()))], names=["list_col"] + ) + df = session.read_arrow(pa_table) + + bigquery_schema = df._block.expr.schema.to_bigquery() + assert len(bigquery_schema) == 2 # offsets + value + field = bigquery_schema[-1] + assert field.mode.upper() == "REPEATED" + assert field.field_type.upper() == "INTEGER" + + +def test_read_arrow_struct_type(session): + struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())]) + pa_table = pa.Table.from_arrays( + [pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)], + names=["struct_col"], + ) + df = session.read_arrow(pa_table) + + bigquery_schema = df._block.expr.schema.to_bigquery() + assert len(bigquery_schema) == 2 # offsets + value + field = bigquery_schema[-1] + assert field.field_type.upper() == "RECORD" + assert field.fields[0].name == "a" + assert field.fields[1].name == "b"