8000 feat: Series binary ops compatible with more types by TrevorBergeron · Pull Request #618 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,6 @@ def _compiled_schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

def validate_schema(self):
tree_derived = self.node.schema
ibis_derived = self._compiled_schema
if tree_derived.names != ibis_derived.names:
raise ValueError(
f"Unexpected names internal {tree_derived.names} vs compiled {ibis_derived.names}"
)
if tree_derived.dtypes != ibis_derived.dtypes:
raise ValueError(
f"Unexpected types internal {tree_derived.dtypes} vs compiled {ibis_derived.dtypes}"
)

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import ibis
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ def clip_op(


@scalar_op_compiler.register_nary_op(ops.case_when_op)
def switch_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value:
def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value:
# ibis can handle most type coercions, but we need to force bool -> int
# TODO: dispatch coercion depending on bigframes dtype schema
result_values = cases_and_outputs[1::2]
Expand Down
29 changes: 24 additions & 5 deletions bigframes/core/convert.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,22 @@
import bigframes.series as series


def to_bf_series(obj, default_index: Optional[index.Index]) -> series.Series:
def is_series_convertible(obj) -> bool:
if isinstance(obj, series.Series):
return True
if isinstance(obj, pd.Series):
return True
if isinstance(obj, index.Index):
return True
if isinstance(obj, pd.Index):
return True
if pd.api.types.is_list_like(obj):
return True
else:
return False


def to_bf_series(obj, default_index: Optional[index.Index], session) -> series.Series:
"""
Convert a an object to a bigframes series

Expand All @@ -37,13 +52,15 @@ def to_bf_series(obj, default_index: Optional[index.Index]) -> series.Series:
if isinstance(obj, series.Series):
return obj
if isinstance(obj, pd.Series):
return series.Series(obj)
return series.Series(obj, session=session)
if isinstance(obj, index.Index):
return series.Series(obj, default_index)
return series.Series(obj, default_index, session=session)
if isinstance(obj, pd.Index):
return series.Series(obj, default_index)
return series.Series(obj, default_index, session=session)
if pd.api.types.is_dict_like(obj):
return series.Series(obj, session=session)
if pd.api.types.is_list_like(obj):
return series.Series(obj, default_index)
return series.Series(obj, default_index, session=session)
else:
raise TypeError(f"Cannot interpret {obj} as series.")

Expand All @@ -69,6 +86,8 @@ def to_pd_series(obj, default_index: pd.Index) -> pd.Series:
return pd.Series(obj.to_pandas(), default_index)
if isinstance(obj, pd.Index):
return pd.Series(obj, default_index)
if pd.api.types.is_dict_like(obj):
return pd.Series(obj)
if pd.api.types.is_list_like(obj):
return pd.Series(obj, default_index)
else:
Expand Down
7 changes: 6 additions & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def __init__(
raise ValueError(
f"DataFrame constructor only supports copy=True. {constants.FEEDBACK_LINK}"
)
# just ignore object dtype if provided
if dtype in {numpy.dtypes.ObjectDType, "object"}:
dtype = None

# Check to see if constructing from BigQuery-backed objects before
# falling back to pandas constructor
Expand Down Expand Up @@ -668,7 +671,9 @@ def _apply_binop(
DataFrame(other), op, how=how, reverse=reverse
)
elif utils.get_axis_number(axis) == 0:
bf_series = bigframes.core.convert.to_bf_series(other, self.index)
bf_series = bigframes.core.convert.to_bf_series(
other, self.index, self._session
)
return self._apply_series_binop_axis_0(bf_series, op, how, reverse)
elif utils.get_axis_number(axis) == 1:
pd_series = bigframes.core.convert.to_pd_series(other, self.columns)
Expand Down
128 changes: 79 additions & 49 deletions bigframes/operations/base.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import typing

import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing
import numpy
import pandas as pd

import bigframes.constants as constants
import bigframes.core.blocks as blocks
import bigframes.core.convert
import bigframes.core.expression as ex
import bigframes.core.indexes as indexes
import bigframes.core.scalar as scalars
Expand All @@ -44,7 +46,19 @@ def __init__(
*,
session: typing.Optional[bigframes.session.Session] = None,
):
block = None
import bigframes.pandas

# just ignore object dtype if provided
if dtype in {numpy.dtypes.ObjectDType, "object"}:
dtype = None

read_pandas_func = (
session.read_pandas
if (session is not None)
else (lambda x: bigframes.pandas.read_pandas(x))
)

block: typing.Optional[blocks.Block] = None
if copy is not None and not copy:
raise ValueError(
f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}"
Expand All @@ -55,58 +69,75 @@ def __init__(
assert index is None
block = data

elif isinstance(data, SeriesMethods):
block = data._block
# interpret these cases as both index and data
elif (
isinstance(data, SeriesMethods)
or isinstance(data, pd.Series)
or pd.api.types.is_dict_like(data)
):
if isinstance(data, pd.Series):
data = read_pandas_func(data)
elif pd.api.types.is_dict_like(data):
data = read_pandas_func(pd.Series(data, dtype=dtype)) # type: ignore
dtype = None
data_block = data._block
if index is not None:
# reindex
bf_index = indexes.Index(index)
bf_index = indexes.Index(index, session=session)
idx_block = bf_index._block
idx_cols = idx_block.value_columns
block_idx, _ = idx_block.join(block, how="left")
block = block_idx.with_index_labels(bf_index.names)

elif isinstance(data, indexes.Index):
block_idx, _ = idx_block.join(data_block, how="left")
data_block = block_idx.with_index_labels(bf_index.names)
block = data_block

# list-like data that will get default index
elif isinstance(data, indexes.Index) or pd.api.types.is_list_like(data):
data = indexes.Index(data, dtype=dtype, session=session)
dtype = (
None # set to none as it has already been applied, avoid re-cast later
)
if data.nlevels != 1:
raise NotImplementedError("Cannot interpret multi-index as Series.")
# Reset index to promote index columns to value columns, set default index
block = data._block.reset_index(drop=False)
data_block = data._block.reset_index(drop=False).with_column_labels(
data.names
)
if index is not None:
# Align by offset
bf_index = indexes.Index(index)
idx_block = bf_index._block.reset_index(drop=False)
bf_index = indexes.Index(index, session=session)
idx_block = bf_index._block.reset_index(
drop=False
) # reset to align by offsets, and then reset back
idx_cols = idx_block.value_columns
block, (l_mapping, _) = idx_block.join(block, how="left")
block = block.set_index([l_mapping[col] for col in idx_cols])
block = block.with_index_labels(bf_index.names)

if block:
if name:
if not isinstance(name, typing.Hashable):
raise ValueError(
f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}"
)
block = block.with_column_labels([name])
if dtype:
block = block.multi_apply_unary_op(
block.value_columns, ops.AsTypeOp(to_type=dtype)
)
else:
import bigframes.pandas
data_block, (l_mapping, _) = idx_block.join(data_block, how="left")
data_block = data_block.set_index([l_mapping[col] for col in idx_cols])
data_block = data_block.with_index_labels(bf_index.names)
block = data_block

pd_series = pd.Series(
data=data, index=index, dtype=dtype, name=name # type:ignore
)
pd_dataframe = pd_series.to_frame()
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if session:
block = session.read_pandas(pd_dataframe)._get_block()
else: # Scalar case
if index is not None:
bf_index = indexes.Index(index, session=session)
else:
# Uses default global session
block = bigframes.pandas.read_pandas(pd_dataframe)._get_block()
if pd_series.name is None:
block = block.with_column_labels([None])
bf_index = indexes.Index(
[] if (data is None) else [0],
session=session,
dtype=bigframes.dtypes.INT_DTYPE,
)
block, _ = bf_index._block.create_constant(data, dtype)
dtype = None
block = block.with_column_labels([name])

assert block is not None
if name:
if not isinstance(name, typing.Hashable):
raise ValueError(
f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}"
)
block = block.with_column_labels([name])
if dtype:
block = block.multi_apply_unary_op(
block.value_columns, ops.AsTypeOp(to_type=dtype)
)
self._block: blocks.Block = block

@property
Expand Down Expand Up @@ -145,17 +176,16 @@ def _apply_binary_op(
reverse: bool = False,
) -> series.Series:
"""Applies a binary operator to the series and other."""
if isinstance(other, pd.Series):
# TODO: Convert to BigQuery DataFrames series
raise NotImplementedError(
f"Pandas series not supported as operand. {constants.FEEDBACK_LINK}"
if bigframes.core.convert.is_series_convertible(other):
self_index = indexes.Index(self._block)
other_series = bigframes.core.convert.to_bf_series(
other, self_index, self._block.session
)
if isinstance(other, series.Series):
(self_col, other_col, block) = self._align(other, how=alignment)
(self_col, other_col, block) = self._align(other_series, how=alignment)

name = self._name
if (
isinstance(other, series.Series)
hasattr(other, "name")
and other.name != self._name
and alignment == "outer"
):
Expand All @@ -166,7 +196,7 @@ def _apply_binary_op(
block, result_id = block.project_expr(expr, name)
return series.Series(block.select_column(result_id))

else:
else: # Scalar binop
name = self._name
expr = op.as_expr(
ex.const(other) if reverse else self._value_column,
Expand Down
9 changes: 0 additions & 9 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import inspect
import itertools
import numbers
import os
import textwrap
import typing
from typing import Any, cast, Literal, Mapping, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -73,11 +72,6 @@ def __init__(self, *args, **kwargs):
self._query_job: Optional[bigquery.QueryJob] = None
super().__init__(*args, **kwargs)

# Runs strict validations to ensure internal type predictions and ibis are completely in sync
# Do not execute these validations outside of testing suite.
if "PYTEST_CURRENT_TEST" in os.environ:
self._block.expr.validate_schema()

@property
def dt(self) -> dt.DatetimeMethods:
return dt.DatetimeMethods(self._block)
Expand Down Expand Up @@ -812,9 +806,6 @@ def combine_first(self, other: Series) -> Series:
return result

def update(self, other: Union[Series, Sequence, Mapping]) -> None:
import bigframes.core.convert

other = bigframes.core.convert.to_bf_series(other, default_index=None)
result = self._apply_binary_op(
other, ops.coalesce_op, reverse=True, alignment="left"
)
Expand Down
Loading
0