8000 feat: add `bpd.read_arrow` to convert an Arrow object into a bigframes DataFrame by tswast · Pull Request #1855 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import bigframes.core.identifiers
import bigframes.core.join_def as join_defs
import bigframes.core.ordering as ordering
import bigframes.core.pyarrow_utils as pyarrow_utils
import bigframes.core.schema as bf_schema
import bigframes.core.sql as sql
import bigframes.core.utils as utils
Expand Down Expand Up @@ -156,6 +157,36 @@ def __init__(
self._view_ref: Optional[bigquery.TableReference] = None
self._view_ref_dry_run: Optional[bigquery.TableReference] = None

@classmethod
def from_pyarrow(
cls,
data: pa.Table,
session: bigframes.Session,
) -> Block:
column_labels = data.column_names

# TODO(tswast): Use array_value.promote_offsets() instead once that node is
# supported by the local engine.
offsets_col = bigframes.core.guid.generate_guid()
index_ids = [offsets_col]
index_labels = [None]

# TODO(https://github.com/googleapis/python-bigquery-dataframes/issues/859):
# Allow users to specify the "total ordering" column(s) or allow multiple
# such columns.
data = pyarrow_utils.append_offsets(data, offsets_col=offsets_col)

# from_pyarrow will normalize the types for us.
managed_data = local_data.ManagedArrowTable.from_pyarrow(data)
array_value = core.ArrayValue.from_managed(managed_data, session=session)
block = cls(
array_value,
column_labels=column_labels,
index_columns=index_ids,
index_labels=index_labels,
)
return block

@classmethod
def from_local(
cls,
Expand Down
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from bigframes.pandas.io.api import (
_read_gbq_colab,
from_glob_path,
read_arrow,
read_csv,
read_gbq,
read_gbq_function,
Expand Down Expand Up @@ -367,6 +368,7 @@ def reset_session():
merge,
qcut,
read_csv,
read_arrow,
read_gbq,
_read_gbq_colab,
read_gbq_function,
Expand Down
16 changes: 16 additions & 0 deletions bigframes/pandas/io/api.py
< 8000 td class="blob-num blob-num-addition empty-cell">
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ReadPickleBuffer,
StorageOptions,
)
import pyarrow as pa

import bigframes._config as config
import bigframes.core.global_session as global_session
Expand Down Expand Up @@ -72,6 +73,21 @@
# method and its arguments.


def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.

Args:
pa_table (pyarrow.Table):
PyArrow table to load data from.

Returns:
bigframes.dataframe.DataFrame:
A new DataFrame representing the data from the PyArrow table.
"""
session = global_session.get_global_session()
return session.read_arrow(pa_table=pa_table)


def read_csv(
filepath_or_buffer: str | IO["bytes"],
*,
Expand Down
18 changes: 18 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@
ReadPickleBuffer,
StorageOptions,
)
import pyarrow as pa

from bigframes import exceptions as bfe
from bigframes import version
import bigframes._config.bigquery_options as bigquery_options
import bigframes.clients
import bigframes.constants
import bigframes.core
from bigframes.core import blocks, log_adapter, utils
import bigframes.core.pyformat

Expand Down Expand Up @@ -967,6 +969,22 @@ def _read_pandas_inline(
local_block = blocks.Block.from_local(pandas_dataframe, self)
return dataframe.DataFrame(local_block)

def read_arrow(self, pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
"""Load a PyArrow Table to a BigQuery DataFrames DataFrame.

Args:
pa_table (pyarrow.Table):
PyArrow table to load data from.

Returns:
bigframes.dataframe.DataFrame:
A new DataFrame representing the data from the PyArrow table.
"""
import bigframes.dataframe as dataframe

local_block = blocks.Block.from_pyarrow(pa_table, self)
return dataframe.DataFrame(local_block)

def read_csv(
self,
filepath_or_buffer: str | IO["bytes"],
Expand Down
133 changes: 133 additions & 0 deletions tests/unit/session/test_io_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime

import pyarrow as pa
import pytest

import bigframes.pandas as bpd
from bigframes.testing import mocks


@pytest.fixture(scope="module")
def session():
# Use the mock session from bigframes.testing
return mocks.create_bigquery_session()


def test_read_arrow_empty_table(session):
empty_table = pa.Table.from_pydict(
{
"col_a": pa.array([], type=pa.int64()),
"col_b": pa.array([], type=pa.string()),
}
)
df = session.read_arrow(empty_table)
assert isinstance(df, bpd.DataFrame)
assert df.shape == (0, 2)
assert list(df.columns) == ["col_a", "col_b"]
pd_df = df.to_pandas()
assert pd_df.empty
assert list(pd_df.columns) == ["col_a", "col_b"]
assert pd_df["col_a"].dtype == "Int64"
assert pd_df["col_b"].dtype == "string[pyarrow]"


@pytest.mark.parametrize(
"data,arrow_type,expected_bq_type_kind",
[
([1, 2], pa.int8(), "INTEGER"),
([1, 2], pa.int16(), "INTEGER"),
([1, 2], pa.int32(), "INTEGER"),
([1, 2], pa.int64(), "INTEGER"),
([1.0, 2.0], pa.float32(), "FLOAT"),
([1.0, 2.0], pa.float64(), "FLOAT"),
([True, False], pa.bool_(), "BOOLEAN"),
(["a", "b"], pa.string(), "STRING"),
(["a", "b"], pa.large_string(), "STRING"),
([b"a", b"b"], pa.binary(), "BYTES"),
([b"a", b"b"], pa.large_binary(), "BYTES"),
(
[
pa.scalar(1000, type=pa.duration("s")),
pa.scalar(2000, type=pa.duration("s")),
],
pa.duration("s"),
"INTEGER",
),
([datetime.date(2023, 1, 1)], pa.date32(), "DATE"),
(
[datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)],
pa.timestamp("s", tz="UTC"),
"TIMESTAMP",
),
(
[datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)],
pa.timestamp("ms", tz="UTC"),
"TIMESTAMP",
),
(
[datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)],
pa.timestamp("us", tz="UTC"),
"TIMESTAMP",
),
([datetime.time(12, 34, 56, 789000)], pa.time64("us"), "TIME"),
],
)
def test_read_arrow_type_mappings(session, data, arrow_type, expected_bq_type_kind):
"""
Tests that various arrow types are mapped to the expected BigQuery types.
This is an indirect check via the resulting DataFrame's schema.
"""
pa_table = pa.Table.from_arrays([pa.array(data, type=arrow_type)], names=["col"])
df = session.read_arrow(pa_table)

bigquery_schema = df._block.expr.schema.to_bigquery()
assert len(bigquery_schema) == 2 # offsets + value
field = bigquery_schema[-1]
assert field.field_type.upper() == expected_bq_type_kind

# Also check pandas dtype after conversion for good measure
pd_df = df.to_pandas()
assert pd_df["col"].shape == (len(data),)


def test_read_arrow_list_type(session):
pa_table = pa.Table.from_arrays(
[pa.array([[1, 2], [3, 4, 5]], type=pa.list_(pa.int64()))], names=["list_col"]
)
df = session.read_arrow(pa_table)

bigquery_schema = df._block.expr.schema.to_bigquery()
assert len(bigquery_schema) == 2 # offsets + value
field = bigquery_schema[-1]
assert field.mode.upper() == "REPEATED"
assert field.field_type.upper() == "INTEGER"


def test_read_arrow_struct_type(session):
struct_type = pa.struct([("a", pa.int64()), ("b", pa.string())])
pa_table = pa.Table.from_arrays(
[pa.array([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}], type=struct_type)],
names=["struct_col"],
)
df = session.read_arrow(pa_table)

bigquery_schema = df._block.expr.schema.to_bigquery()
assert len(bigque 4322 ry_schema) == 2 # offsets + value
field = bigquery_schema[-1]
assert field.field_type.upper() == "RECORD"
assert field.fields[0].name == "a"
assert field.fields[1].name == "b"
0