diff --git a/bigframes/core/reshape/__init__.py b/bigframes/core/reshape/__init__.py index 49ecedcc87..1dc90d1848 100644 --- a/bigframes/core/reshape/__init__.py +++ b/bigframes/core/reshape/__init__.py @@ -11,190 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - -import typing -from typing import Iterable, Literal, Optional, Union - -import bigframes_vendored.constants as constants -import pandas as pd - -import bigframes.core.expression as ex -import bigframes.core.ordering as order -import bigframes.core.utils as utils -import bigframes.core.window_spec as window_specs -import bigframes.dataframe -import bigframes.operations as ops -import bigframes.operations.aggregations as agg_ops -import bigframes.series - - -@typing.overload -def concat( - objs: Iterable[bigframes.series.Series], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.series.Series: - ... - - -@typing.overload -def concat( - objs: Iterable[bigframes.dataframe.DataFrame], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Literal["columns", 1], - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis=..., - join=..., - ignore_index=..., -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - ... - - -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Union[str, int] = 0, - join: Literal["inner", "outer"] = "outer", - ignore_index: bool = False, -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - axis_n = utils.get_axis_number(axis) - if axis_n == 0: - contains_dataframes = any( - isinstance(x, bigframes.dataframe.DataFrame) for x in objs - ) - if not contains_dataframes: - # Special case, all series, so align everything into single column even if labels don't match - series = typing.cast(typing.Iterable[bigframes.series.Series], objs) - names = {s.name for s in series} - # For series case, labels are stripped if they don't all match - if len(names) > 1: - blocks = [s._block.with_column_labels([None]) for s in series] - else: - blocks = [s._block for s in series] - block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) - return bigframes.series.Series(block) - blocks = [obj._block for obj in objs] - block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) - return bigframes.dataframe.DataFrame(block) - else: - # Note: does not validate inputs - block_list = [obj._block for obj in objs] - block = block_list[0] - for rblock in block_list[1:]: - block, _ = block.join(rblock, how=join) - return bigframes.dataframe.DataFrame(block) - - -def cut( - x: bigframes.series.Series, - bins: Union[ - int, - pd.IntervalIndex, - Iterable, - ], - *, - labels: Union[Iterable[str], bool, None] = None, -) -> bigframes.series.Series: - if isinstance(bins, int) and bins <= 0: - raise ValueError("`bins` should be a positive integer.") - - if isinstance(bins, Iterable): - if isinstance(bins, pd.IntervalIndex): - as_index: pd.IntervalIndex = bins - bins = tuple((bin.left.item(), bin.right.item()) for bin in bins) - elif len(list(bins)) == 0: - raise ValueError("`bins` iterable should have at least one item") - elif isinstance(list(bins)[0], tuple): - as_index = pd.IntervalIndex.from_tuples(list(bins)) - bins = tuple(bins) - elif pd.api.types.is_number(list(bins)[0]): - bins_list = list(bins) - if len(bins_list) < 2: - raise ValueError( - "`bins` iterable of numeric breaks should have" - " at least two items" - ) - as_index = pd.IntervalIndex.from_breaks(bins_list) - single_type = all([isinstance(n, type(bins_list[0])) for n in bins_list]) - numeric_type = type(bins_list[0]) if single_type else float - bins = tuple( - [ - (numeric_type(bins_list[i]), numeric_type(bins_list[i + 1])) - for i in range(len(bins_list) - 1) - ] - ) - else: - raise ValueError("`bins` iterable should contain tuples or numerics") - - if as_index.is_overlapping: - raise ValueError("Overlapping IntervalIndex is not accepted.") - - if labels is not None and labels is not False: - raise NotImplementedError( - "The 'labels' parameter must be either False or None. " - "Please provide a valid value for 'labels'." - ) - - return x._apply_window_op( - agg_ops.CutOp(bins, labels=labels), window_spec=window_specs.unbound() - ) - - -def qcut( - x: bigframes.series.Series, - q: typing.Union[int, typing.Sequence[float]], - *, - labels: Optional[bool] = None, - duplicates: typing.Literal["drop", "error"] = "error", -) -> bigframes.series.Series: - if isinstance(q, int) and q <= 0: - raise ValueError("`q` should be a positive integer.") - if utils.is_list_like(q): - q = tuple(q) - - if labels is not False: - raise NotImplementedError( - f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" - ) - if duplicates != "drop": - raise NotImplementedError( - f"Only duplicates='drop' is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" - ) - block = x._block - label = block.col_id_to_label[x._value_column] - block, nullity_id = block.apply_unary_op(x._value_column, ops.notnull_op) - block, result = block.apply_window_op( - x._value_column, - agg_ops.QcutOp(q), # type: ignore - window_spec=window_specs.unbound( - grouping_keys=(nullity_id,), - ordering=(order.ascending_over(x._value_column),), - ), - ) - block, result = block.project_expr( - ops.where_op.as_expr(result, nullity_id, ex.const(None)), label=label - ) - return bigframes.series.Series(block.select_column(result)) diff --git a/bigframes/core/joins/__init__.py b/bigframes/core/reshape/api.py similarity index 71% rename from bigframes/core/joins/__init__.py rename to bigframes/core/reshape/api.py index 3c5b9605a3..234ca4a2f9 100644 --- a/bigframes/core/joins/__init__.py +++ b/bigframes/core/reshape/api.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,10 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Helpers to join ArrayValue objects.""" +from bigframes.core.reshape.concat import concat +from bigframes.core.reshape.merge import merge +from bigframes.core.reshape.tile import cut, qcut -from bigframes.core.joins.merge import merge - -__all__ = [ - "merge", -] +__all__ = ["concat", "cut", "qcut", "merge"] diff --git a/bigframes/core/reshape/concat.py b/bigframes/core/reshape/concat.py new file mode 100644 index 0000000000..a42488cbe8 --- /dev/null +++ b/bigframes/core/reshape/concat.py @@ -0,0 +1,106 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import typing +from typing import Iterable, Literal, Union + +import bigframes_vendored.pandas.core.reshape.concat as vendored_pandas_concat + +import bigframes.core.utils as utils +import bigframes.dataframe +import bigframes.series + + +@typing.overload +def concat( + objs: Iterable[bigframes.series.Series], + *, + axis: typing.Literal["index", 0] = ..., + join=..., + ignore_index=..., +) -> bigframes.series.Series: + ... + + +@typing.overload +def concat( + objs: Iterable[bigframes.dataframe.DataFrame], + *, + axis: typing.Literal["index", 0] = ..., + join=..., + ignore_index=..., +) -> bigframes.dataframe.DataFrame: + ... + + +@typing.overload +def concat( + objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], + *, + axis: typing.Literal["columns", 1], + join=..., + ignore_index=..., +) -> bigframes.dataframe.DataFrame: + ... + + +@typing.overload +def concat( + objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], + *, + axis=..., + join=..., + ignore_index=..., +) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: + ... + + +def concat( + objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], + *, + axis: typing.Union[str, int] = 0, + join: Literal["inner", "outer"] = "outer", + ignore_index: bool = False, +) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: + axis_n = utils.get_axis_number(axis) + if axis_n == 0: + contains_dataframes = any( + isinstance(x, bigframes.dataframe.DataFrame) for x in objs + ) + if not contains_dataframes: + # Special case, all series, so align everything into single column even if labels don't match + series = typing.cast(typing.Iterable[bigframes.series.Series], objs) + names = {s.name for s in series} + # For series case, labels are stripped if they don't all match + if len(names) > 1: + blocks = [s._block.with_column_labels([None]) for s in series] + else: + blocks = [s._block for s in series] + block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) + return bigframes.series.Series(block) + blocks = [obj._block for obj in objs] + block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) + return bigframes.dataframe.DataFrame(block) + else: + # Note: does not validate inputs + block_list = [obj._block for obj in objs] + block = block_list[0] + for rblock in block_list[1:]: + block, _ = block.join(rblock, how=join) + return bigframes.dataframe.DataFrame(block) + + +concat.__doc__ = vendored_pandas_concat.concat.__doc__ diff --git a/bigframes/core/joins/merge.py b/bigframes/core/reshape/merge.py similarity index 94% rename from bigframes/core/joins/merge.py rename to bigframes/core/reshape/merge.py index 1542cda0af..e1750d5c7a 100644 --- a/bigframes/core/joins/merge.py +++ b/bigframes/core/reshape/merge.py @@ -21,6 +21,8 @@ import typing from typing import Literal, Optional +import bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge + # Avoid cirular imports. if typing.TYPE_CHECKING: import bigframes.dataframe @@ -58,6 +60,9 @@ def merge( ) +merge.__doc__ = vendored_pandas_merge.merge.__doc__ + + def _validate_operand( obj: bigframes.dataframe.DataFrame | bigframes.series.Series, ) -> bigframes.dataframe.DataFrame: diff --git a/bigframes/core/reshape/tile.py b/bigframes/core/reshape/tile.py new file mode 100644 index 0000000000..2a2ca9de95 --- /dev/null +++ b/bigframes/core/reshape/tile.py @@ -0,0 +1,129 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import typing +from typing import Iterable, Optional, Union + +import bigframes_vendored.constants as constants +import bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile +import pandas as pd + +import bigframes.core.expression as ex +import bigframes.core.ordering as order +import bigframes.core.utils as utils +import bigframes.core.window_spec as window_specs +import bigframes.dataframe +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops +import bigframes.series + + +def cut( + x: bigframes.series.Series, + bins: Union[ + int, + pd.IntervalIndex, + Iterable, + ], + *, + labels: Union[Iterable[str], bool, None] = None, +) -> bigframes.series.Series: + if isinstance(bins, int) and bins <= 0: + raise ValueError("`bins` should be a positive integer.") + + if isinstance(bins, Iterable): + if isinstance(bins, pd.IntervalIndex): + as_index: pd.IntervalIndex = bins + bins = tuple((bin.left.item(), bin.right.item()) for bin in bins) + elif len(list(bins)) == 0: + raise ValueError("`bins` iterable should have at least one item") + elif isinstance(list(bins)[0], tuple): + as_index = pd.IntervalIndex.from_tuples(list(bins)) + bins = tuple(bins) + elif pd.api.types.is_number(list(bins)[0]): + bins_list = list(bins) + if len(bins_list) < 2: + raise ValueError( + "`bins` iterable of numeric breaks should have" + " at least two items" + ) + as_index = pd.IntervalIndex.from_breaks(bins_list) + single_type = all([isinstance(n, type(bins_list[0])) for n in bins_list]) + numeric_type = type(bins_list[0]) if single_type else float + bins = tuple( + [ + (numeric_type(bins_list[i]), numeric_type(bins_list[i + 1])) + for i in range(len(bins_list) - 1) + ] + ) + else: + raise ValueError("`bins` iterable should contain tuples or numerics") + + if as_index.is_overlapping: + raise ValueError("Overlapping IntervalIndex is not accepted.") + + if labels is not None and labels is not False: + raise NotImplementedError( + "The 'labels' parameter must be either False or None. " + "Please provide a valid value for 'labels'." + ) + + return x._apply_window_op( + agg_ops.CutOp(bins, labels=labels), window_spec=window_specs.unbound() + ) + + +cut.__doc__ = vendored_pandas_tile.cut.__doc__ + + +def qcut( + x: bigframes.series.Series, + q: typing.Union[int, typing.Sequence[float]], + *, + labels: Optional[bool] = None, + duplicates: typing.Literal["drop", "error"] = "error", +) -> bigframes.series.Series: + if isinstance(q, int) and q <= 0: + raise ValueError("`q` should be a positive integer.") + if utils.is_list_like(q): + q = tuple(q) + + if labels is not False: + raise NotImplementedError( + f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" + ) + if duplicates != "drop": + raise NotImplementedError( + f"Only duplicates='drop' is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" + ) + block = x._block + label = block.col_id_to_label[x._value_column] + block, nullity_id = block.apply_unary_op(x._value_column, ops.notnull_op) + block, result = block.apply_window_op( + x._value_column, + agg_ops.QcutOp(q), # type: ignore + window_spec=window_specs.unbound( + grouping_keys=(nullity_id,), + ordering=(order.ascending_over(x._value_column),), + ), + ) + block, result = block.project_expr( + ops.where_op.as_expr(result, nullity_id, ex.const(None)), label=label + ) + return bigframes.series.Series(block.select_column(result)) + + +qcut.__doc__ = vendored_pandas_tile.qcut.__doc__ diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index bedc13ecb3..4e175ce31c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -77,7 +77,6 @@ import bigframes.operations.semantics import bigframes.operations.structs import bigframes.series -import bigframes.series as bf_series import bigframes.session._io.bigquery if typing.TYPE_CHECKING: @@ -141,9 +140,13 @@ def __init__( elif ( utils.is_dict_like(data) and len(data) >= 1 - and any(isinstance(data[key], bf_series.Series) for key in data.keys()) + and any( + isinstance(data[key], bigframes.series.Series) for key in data.keys() + ) ): - if not all(isinstance(data[key], bf_series.Series) for key in data.keys()): + if not all( + isinstance(data[key], bigframes.series.Series) for key in data.keys() + ): # TODO(tbergeron): Support local list/series data by converting to memtable. raise NotImplementedError( f"Cannot mix Series with other types. {constants.FEEDBACK_LINK}" @@ -151,13 +154,13 @@ def __init__( keys = list(data.keys()) first_label, first_series = keys[0], data[keys[0]] block = ( - typing.cast(bf_series.Series, first_series) + typing.cast(bigframes.series.Series, first_series) ._get_block() .with_column_labels([first_label]) ) for key in keys[1:]: - other = typing.cast(bf_series.Series, data[key]) + other = typing.cast(bigframes.series.Series, data[key]) other_block = other._block.with_column_labels([key]) # Pandas will keep original sorting if all indices are aligned. # We cannot detect this easily however, and so always sort on index @@ -1173,7 +1176,7 @@ def combine( results.append(result) if all([isinstance(val, bigframes.series.Series) for val in results]): - import bigframes.core.reshape as rs + import bigframes.core.reshape.api as rs return rs.concat(results, axis=1) else: @@ -2526,7 +2529,7 @@ def describe(self, include: None | Literal["all"] = None) -> DataFrame: elif len(non_numeric_result.columns) == 0: return numeric_result else: - import bigframes.core.reshape as rs + import bigframes.core.reshape.api as rs # Use reindex after join to preserve the original column order. return rs.concat( @@ -3988,7 +3991,7 @@ def _cached(self, *, force: bool = False) -> DataFrame: @validations.requires_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: - if not isinstance(other, (DataFrame, bf_series.Series)): + if not isinstance(other, (DataFrame, bigframes.series.Series)): raise NotImplementedError( f"Only DataFrame or Series operand is supported. {constants.FEEDBACK_LINK}" ) @@ -4063,7 +4066,7 @@ def get_right_id(id): ) result = result[other_frame.columns] - if isinstance(other, bf_series.Series): + if isinstance(other, bigframes.series.Series): # There should be exactly one column in the result result = result[result.columns[0]].rename() diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py index c605f30765..79b92afe4f 100644 --- a/bigframes/operations/semantics.py +++ b/bigframes/operations/semantics.py @@ -481,7 +481,7 @@ def map( )["ml_generate_text_llm_result"], ) - from bigframes.core.reshape import concat + from bigframes.core.reshape.api import concat return concat([self._df, results.rename(output_column)], axis=1) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 721736247c..33b41bb851 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -21,13 +21,10 @@ import inspect import sys import typing -from typing import Any, Iterable, List, Literal, Optional, Sequence, Tuple, Union +from typing import Any, List, Literal, Optional, Sequence, Tuple, Union import bigframes_vendored.constants as constants -import bigframes_vendored.pandas.core.reshape.concat as vendored_pandas_concat import bigframes_vendored.pandas.core.reshape.encoding as vendored_pandas_encoding -import bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge -import bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile import bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes import pandas @@ -36,8 +33,7 @@ import bigframes.core.expression as ex import bigframes.core.global_session as global_session import bigframes.core.indexes -import bigframes.core.joins -import bigframes.core.reshape +from bigframes.core.reshape.api import concat, cut, merge, qcut import bigframes.core.tools import bigframes.dataframe import bigframes.enums @@ -71,81 +67,6 @@ # Include method definition so that the method appears in our docs for # bigframes.pandas general functions. -@typing.overload -def concat( - objs: Iterable[bigframes.series.Series], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.series.Series: - ... - - -@typing.overload -def concat( - objs: Iterable[bigframes.dataframe.DataFrame], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Literal["columns", 1], - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis=..., - join=..., - ignore_index=..., -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - ... - - -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Union[str, int] = 0, - join: Literal["inner", "outer"] = "outer", - ignore_index: bool = False, -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - return bigframes.core.reshape.concat( - objs=objs, axis=axis, join=join, ignore_index=ignore_index - ) - - -concat.__doc__ = vendored_pandas_concat.concat.__doc__ - - -def cut( - x: bigframes.series.Series, - bins: int, - *, - labels: Union[Iterable[str], bool, None] = None, -) -> bigframes.series.Series: - return bigframes.core.reshape.cut( - x, - bins, - labels=labels, - ) - - -cut.__doc__ = vendored_pandas_tile.cut.__doc__ - - def get_dummies( data: Union[DataFrame, Series], prefix: Union[List, dict, str, None] = None, @@ -318,51 +239,6 @@ def _perform_get_dummies_block_operations( return block, intermediate_col_ids -def qcut( - x: bigframes.series.Series, - q: int, - *, - labels: Optional[bool] = None, - duplicates: typing.Literal["drop", "error"] = "error", -) -> bigframes.series.Series: - return bigframes.core.reshape.qcut(x, q, labels=labels, duplicates=duplicates) - - -qcut.__doc__ = vendored_pandas_tile.qcut.__doc__ - - -def merge( - left: DataFrame, - right: DataFrame, - how: Literal[ - "inner", - "left", - "outer", - "right", - "cross", - ] = "inner", - on: Optional[str] = None, - *, - left_on: Optional[str] = None, - right_on: Optional[str] = None, - sort: bool = False, - suffixes: tuple[str, str] = ("_x", "_y"), -) -> DataFrame: - return bigframes.core.joins.merge( - left, - right, - how=how, - on=on, - left_on=left_on, - right_on=right_on, - sort=sort, - suffixes=suffixes, - ) - - -merge.__doc__ = vendored_pandas_merge.merge.__doc__ - - def remote_function( input_types: Union[None, type, Sequence[type]] = None, output_type: Optional[type] = None, diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 19ff5e0a05..c24651b43f 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -45,7 +45,6 @@ import bigframes.core.blocks import bigframes.core.global_session as global_session import bigframes.core.indexes -import bigframes.core.joins import bigframes.core.reshape import bigframes.core.tools import bigframes.dataframe @@ -54,7 +53,6 @@ import bigframes.session import bigframes.session._io.bigquery import bigframes.session.clients -import bigframes.version # Note: the following methods are duplicated from Session. This duplication # enables the following: