From 149258ef802fefe2c873e880787d309030cbc6ab Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 5 Dec 2024 01:03:39 +0000 Subject: [PATCH 1/9] feat: Add support for temporal types in dataframe's describe() method --- bigframes/dataframe.py | 53 +++++++++++++++++++--- bigframes/dtypes.py | 8 +++- bigframes/operations/aggregations.py | 3 ++ tests/system/small/test_dataframe.py | 45 ++++++++++++++---- tests/unit/operations/test_aggregations.py | 8 ++-- 5 files changed, 97 insertions(+), 20 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 185a756fed..e7cf28ef02 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -514,6 +514,17 @@ def select_dtypes(self, include=None, exclude=None) -> DataFrame: ) return DataFrame(self._block.select_columns(selected_columns)) + def _select_exact_dtypes( + self, dtypes: Sequence[bigframes.dtypes.Dtype] + ) -> DataFrame: + """Selects columns without considering inheritance relationships.""" + columns = [ + col_id + for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) + if dtype in dtypes + ] + return DataFrame(self._block.select_columns(columns)) + def _set_internal_query_job(self, query_job: Optional[bigquery.QueryJob]): self._query_job = query_job @@ -2321,7 +2332,10 @@ def melt( def describe(self, include: None | Literal["all"] = None) -> DataFrame: if include is None: - numeric_df = self._drop_non_numeric(permissive=False) + numeric_df = self._select_exact_dtypes( + bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + + bigframes.dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES + ) if len(numeric_df.columns) == 0: # Describe eligible non-numeric columns return self._describe_non_numeric() @@ -2349,9 +2363,11 @@ def describe(self, include: None | Literal["all"] = None) -> DataFrame: raise ValueError(f"Unsupported include type: {include}") def _describe_numeric(self) -> DataFrame: - return typing.cast( + number_df_result = typing.cast( DataFrame, - self._drop_non_numeric(permissive=False).agg( + self._select_exact_dtypes( + bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + ).agg( [ "count", "mean", @@ -2364,16 +2380,41 @@ def _describe_numeric(self) -> DataFrame: ] ), ) + temporal_df_result = typing.cast( + DataFrame, + self._select_exact_dtypes( + bigframes.dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES + ).agg(["count"]), + ) + + if len(number_df_result.columns) == 0: + return temporal_df_result + elif len(temporal_df_result.columns) == 0: + return number_df_result + else: + import bigframes.core.reshape as rs + + original_columns = self._select_exact_dtypes( + bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE + + bigframes.dtypes.TEMPORAL_NUMERIC_BIGFRAMES_TYPES + ).columns + + # Use reindex after join to preserve the original column order. + return rs.concat( + [number_df_result, temporal_df_result], + axis=1, + )._reindex_columns(original_columns) def _describe_non_numeric(self) -> DataFrame: return typing.cast( DataFrame, - self.select_dtypes( - include={ + self._select_exact_dtypes( + [ bigframes.dtypes.STRING_DTYPE, bigframes.dtypes.BOOL_DTYPE, bigframes.dtypes.BYTES_DTYPE, - } + bigframes.dtypes.TIME_DTYPE, + ] ).agg(["count", "nunique"]), ) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index c71531f9f3..bb4367ae0d 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -221,8 +221,12 @@ class SimpleDtypeInfo: BIGNUMERIC_DTYPE, ] +TEMPORAL_BIGFRAMES_TYPES = [TIME_DTYPE, DATE_DTYPE, TIMESTAMP_DTYPE, DATETIME_DTYPE] +# Temporal types that are considered as "numeric" by Pandas +TEMPORAL_NUMERIC_BIGFRAMES_TYPES = [DATE_DTYPE, TIMESTAMP_DTYPE, DATETIME_DTYPE] -## dtype predicates - use these to maintain consistency + +# dtype predicates - use these to maintain consistency def is_datetime_like(type_: ExpressionType) -> bool: return type_ in (DATETIME_DTYPE, TIMESTAMP_DTYPE) @@ -630,7 +634,7 @@ def can_coerce(source_type: ExpressionType, target_type: ExpressionType) -> bool return True # None can be coerced to any supported type else: return (source_type == STRING_DTYPE) and ( - target_type in (DATETIME_DTYPE, TIMESTAMP_DTYPE, TIME_DTYPE, DATE_DTYPE) + target_type in TEMPORAL_BIGFRAMES_TYPES ) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 3e4e9d1df1..8d20d8c454 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -585,6 +585,9 @@ def is_agg_op_supported(dtype: dtypes.Dtype, op: AggregateOp) -> bool: if dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: return True + if dtype in dtypes.TEMPORAL_BIGFRAMES_TYPES: + return isinstance(op, (CountOp, NuniqueOp, MinOp, MaxOp)) + if dtype in (dtypes.STRING_DTYPE, dtypes.BOOL_DTYPE, dtypes.BYTES_DTYPE): return isinstance(op, (CountOp, NuniqueOp)) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 12ca13eb80..c9836756eb 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2672,11 +2672,11 @@ def test_dataframe_agg_int_multi_string(scalars_dfs): @skip_legacy_pandas -def test_df_describe(scalars_dfs): +def test_df_describe_non_temporal(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs - # pyarrows time columns fail in pandas + # excluding temporal columns here because BigFrames cannot perform percentiles operations on them unsupported_columns = ["datetime_col", "timestamp_col", "time_col", "date_col"] - bf_result = scalars_df.describe().to_pandas() + bf_result = scalars_df.drop(columns=unsupported_columns).describe().to_pandas() modified_pd_df = scalars_pandas_df.drop(columns=unsupported_columns) pd_result = modified_pd_df.describe() @@ -2710,12 +2710,14 @@ def test_df_describe(scalars_dfs): def test_df_describe_non_numeric(scalars_dfs, include): scalars_df, scalars_pandas_df = scalars_dfs - non_numeric_columns = ["string_col", "bytes_col", "bool_col"] + # Excluding "date_col" here because in BigFrames it is used as PyArrow[date32()], which is + # considered numerical in Pandas + target_columns = ["string_col", "bytes_col", "bool_col", "time_col"] - modified_bf = scalars_df[non_numeric_columns] + modified_bf = scalars_df[target_columns] bf_result = modified_bf.describe(include=include).to_pandas() - modified_pd_df = scalars_pandas_df[non_numeric_columns] + modified_pd_df = scalars_pandas_df[target_columns] pd_result = modified_pd_df.describe(include=include) # Reindex results with the specified keys and their order, because @@ -2727,8 +2729,35 @@ def test_df_describe_non_numeric(scalars_dfs, include): ).rename(index={"unique": "nunique"}) pd.testing.assert_frame_equal( - pd_result[non_numeric_columns].astype("Int64"), - bf_result[non_numeric_columns], + pd_result.astype("Int64"), + bf_result, + check_index_type=False, + ) + + +@skip_legacy_pandas +def test_df_describe_temporal(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + temporal_columns = ["datetime_col", "timestamp_col", "time_col", "date_col"] + + modified_bf = scalars_df[temporal_columns] + bf_result = modified_bf.describe(include="all").to_pandas() + + modified_pd_df = scalars_pandas_df[temporal_columns] + pd_result = modified_pd_df.describe(include="all") + + # Reindex results with the specified keys and their order, because + # the relative order is not important. + bf_result = bf_result.reindex(["count", "nunique"]) + pd_result = pd_result.reindex( + ["count", "unique"] + # BF counter part of "unique" is called "nunique" + ).rename(index={"unique": "nunique"}) + + pd.testing.assert_frame_equal( + pd_result.astype("Float64"), + bf_result.astype("Float64"), check_index_type=False, ) diff --git a/tests/unit/operations/test_aggregations.py b/tests/unit/operations/test_aggregations.py index 68ad48ac29..7554e61b96 100644 --- a/tests/unit/operations/test_aggregations.py +++ b/tests/unit/operations/test_aggregations.py @@ -68,10 +68,10 @@ def test_is_agg_op_supported_numeric_support_all(dtype, op): [ (dtypes.STRING_DTYPE, {count_op, nunique_op}), (dtypes.BYTES_DTYPE, {count_op, nunique_op}), - (dtypes.DATE_DTYPE, set()), - (dtypes.TIME_DTYPE, set()), - (dtypes.DATETIME_DTYPE, set()), - (dtypes.TIMESTAMP_DTYPE, set()), + (dtypes.DATE_DTYPE, {count_op, nunique_op, min_op, max_op}), + (dtypes.TIME_DTYPE, {count_op, nunique_op, min_op, max_op}), + (dtypes.DATETIME_DTYPE, {count_op, nunique_op, min_op, max_op}), + (dtypes.TIMESTAMP_DTYPE, {count_op, nunique_op, min_op, max_op}), (dtypes.GEO_DTYPE, set()), ], ) From 70453cb3694edb71eed2cdf10db676d6ae87826b Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 5 Dec 2024 01:23:19 +0000 Subject: [PATCH 2/9] add type hint to make mypy happy --- bigframes/dtypes.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index bb4367ae0d..69faa056fe 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -18,7 +18,7 @@ import datetime import decimal import typing -from typing import Dict, Literal, Union +from typing import Dict, List, Literal, Union import bigframes_vendored.constants as constants import geopandas as gpd # type: ignore @@ -211,7 +211,7 @@ class SimpleDtypeInfo: # Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation) # Pandas is inconsistent, so two definitions are provided, each used in different contexts -NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE = [ +NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE: List[Dtype] = [ FLOAT_DTYPE, INT_DTYPE, ] @@ -221,9 +221,14 @@ class SimpleDtypeInfo: BIGNUMERIC_DTYPE, ] -TEMPORAL_BIGFRAMES_TYPES = [TIME_DTYPE, DATE_DTYPE, TIMESTAMP_DTYPE, DATETIME_DTYPE] + # Temporal types that are considered as "numeric" by Pandas -TEMPORAL_NUMERIC_BIGFRAMES_TYPES = [DATE_DTYPE, TIMESTAMP_DTYPE, DATETIME_DTYPE] +TEMPORAL_NUMERIC_BIGFRAMES_TYPES: List[Dtype] = [ + DATE_DTYPE, + TIMESTAMP_DTYPE, + DATETIME_DTYPE, +] +TEMPORAL_BIGFRAMES_TYPES = TEMPORAL_NUMERIC_BIGFRAMES_TYPES + [TIME_DTYPE] # dtype predicates - use these to maintain consistency From 2953c9bc86c705e834d0b311d6a04e0b753d8fe3 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 6 Dec 2024 19:43:13 +0000 Subject: [PATCH 3/9] directly use output_type to check agg op support for input type --- bigframes/dataframe.py | 8 +-- bigframes/operations/aggregations.py | 14 ---- tests/unit/operations/test_aggregations.py | 83 ---------------------- 3 files changed, 1 insertion(+), 104 deletions(-) delete mode 100644 tests/unit/operations/test_aggregations.py diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e7cf28ef02..18b4a09865 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2257,13 +2257,7 @@ def agg( aggregations = [agg_ops.lookup_agg_func(f) for f in func] for dtype, agg in itertools.product(self.dtypes, aggregations): - if not bigframes.operations.aggregations.is_agg_op_supported( - dtype, agg - ): - raise NotImplementedError( - f"Type {dtype} does not support aggregation {agg}. " - f"Share your usecase with the BigQuery DataFrames team at the {constants.FEEDBACK_LINK}" - ) + agg.output_type(dtype) # Raises exception if the agg does not support the dtype. return DataFrame( self._block.summarize( diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 8d20d8c454..6b7f56d708 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -579,17 +579,3 @@ def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregate return _AGGREGATIONS_LOOKUP[key] else: raise ValueError(f"Unrecognize aggregate function: {key}") - - -def is_agg_op_supported(dtype: dtypes.Dtype, op: AggregateOp) -> bool: - if dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: - return True - - if dtype in dtypes.TEMPORAL_BIGFRAMES_TYPES: - return isinstance(op, (CountOp, NuniqueOp, MinOp, MaxOp)) - - if dtype in (dtypes.STRING_DTYPE, dtypes.BOOL_DTYPE, dtypes.BYTES_DTYPE): - return isinstance(op, (CountOp, NuniqueOp)) - - # For all other types, support no aggregation - return False diff --git a/tests/unit/operations/test_aggregations.py b/tests/unit/operations/test_aggregations.py deleted file mode 100644 index 7554e61b96..0000000000 --- a/tests/unit/operations/test_aggregations.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -import bigframes.dtypes as dtypes -from bigframes.operations.aggregations import ( - all_op, - any_op, - count_op, - dense_rank_op, - first_op, - is_agg_op_supported, - max_op, - mean_op, - median_op, - min_op, - nunique_op, - product_op, - rank_op, - size_op, - std_op, - sum_op, - var_op, -) - -_ALL_OPS = set( - [ - size_op, - sum_op, - mean_op, - median_op, - product_op, - max_op, - min_op, - std_op, - var_op, - count_op, - nunique_op, - rank_op, - dense_rank_op, - all_op, - any_op, - first_op, - ] -) - - -@pytest.mark.parametrize("dtype", dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) -@pytest.mark.parametrize("op", _ALL_OPS) -def test_is_agg_op_supported_numeric_support_all(dtype, op): - assert is_agg_op_supported(dtype, op) is True - - -@pytest.mark.parametrize( - ("dtype", "supported_ops"), - [ - (dtypes.STRING_DTYPE, {count_op, nunique_op}), - (dtypes.BYTES_DTYPE, {count_op, nunique_op}), - (dtypes.DATE_DTYPE, {count_op, nunique_op, min_op, max_op}), - (dtypes.TIME_DTYPE, {count_op, nunique_op, min_op, max_op}), - (dtypes.DATETIME_DTYPE, {count_op, nunique_op, min_op, max_op}), - (dtypes.TIMESTAMP_DTYPE, {count_op, nunique_op, min_op, max_op}), - (dtypes.GEO_DTYPE, set()), - ], -) -def test_is_agg_op_supported_non_numeric(dtype, supported_ops): - for op in supported_ops: - assert is_agg_op_supported(dtype, op) is True - - for op in _ALL_OPS - supported_ops: - assert is_agg_op_supported(dtype, op) is False From b1c82b43ec8e0bd34d93e471348c78fa222c6d68 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 6 Dec 2024 19:45:15 +0000 Subject: [PATCH 4/9] format code --- bigframes/dataframe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 18b4a09865..734b54609c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2257,7 +2257,9 @@ def agg( aggregations = [agg_ops.lookup_agg_func(f) for f in func] for dtype, agg in itertools.product(self.dtypes, aggregations): - agg.output_type(dtype) # Raises exception if the agg does not support the dtype. + agg.output_type( + dtype + ) # Raises exception if the agg does not support the dtype. return DataFrame( self._block.summarize( From ebe3e1c86996697a376db683feeebbd488cc697d Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 6 Dec 2024 19:46:04 +0000 Subject: [PATCH 5/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/dataframe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 18b4a09865..734b54609c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2257,7 +2257,9 @@ def agg( aggregations = [agg_ops.lookup_agg_func(f) for f in func] for dtype, agg in itertools.product(self.dtypes, aggregations): - agg.output_type(dtype) # Raises exception if the agg does not support the dtype. + agg.output_type( + dtype + ) # Raises exception if the agg does not support the dtype. return DataFrame( self._block.summarize( From 7d464ce0d87682ae93a58f1e8b9d883fd4992d95 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:36:05 -0800 Subject: [PATCH 6/9] perf: update df.corr, df.cov to be used with more than 30 columns case. (#1161) * perf: update df.corr, df.cov to be used with more than 30 columns case. * add large test * remove print * fix_index * fix index * test fix * fix test * fix test * slightly improve multi_apply_unary_op to avoid RecursionError * update recursion limit for nox session * skip the test in e2e/python 3.12 * simplify code * simplify code --- bigframes/core/blocks.py | 49 +------ bigframes/dataframe.py | 192 +++++++++++++++++++++++++- tests/system/conftest.py | 22 +++ tests/system/large/test_dataframe.py | 42 ++++++ tests/system/small/test_dataframe.py | 9 +- tests/system/small/test_multiindex.py | 8 +- 6 files changed, 268 insertions(+), 54 deletions(-) create mode 100644 tests/system/large/test_dataframe.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 574bed00eb..a9139dfee6 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -912,6 +912,8 @@ def multi_apply_unary_op( input_varname = input_varnames[0] block = self + + result_ids = [] for col_id in columns: label = self.col_id_to_label[col_id] block, result_id = block.project_expr( @@ -919,7 +921,8 @@ def multi_apply_unary_op( label=label, ) block = block.copy_values(result_id, col_id) - block = block.drop_columns([result_id]) + result_ids.append(result_id) + block = block.drop_columns(result_ids) # Special case, we can preserve transpose cache for full-frame unary ops if (self._transpose_cache is not None) and set(self.value_columns) == set( columns @@ -1317,50 +1320,6 @@ def summarize( index_columns=index_cols, ) - def calculate_pairwise_metric(self, op=agg_ops.CorrOp()): - """ - Returns a block object to compute pairwise metrics among all value columns in this block. - - The metric to be computed is specified by the `op` parameter, which can be either a - correlation operation (default) or a covariance operation. - """ - if len(self.value_columns) > 30: - raise NotImplementedError( - "This function supports dataframes with 30 columns or fewer. " - f"Provided dataframe has {len(self.value_columns)} columns. {constants.FEEDBACK_LINK}" - ) - - aggregations = [ - ( - ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)), - f"{left_col}-{right_col}", - ) - for left_col in self.value_columns - for right_col in self.value_columns - ] - expr = self.expr.aggregate(aggregations) - - input_count = len(self.value_columns) - unpivot_columns = tuple( - tuple(expr.column_ids[input_count * i : input_count * (i + 1)]) - for i in range(input_count) - ) - labels = self._get_labels_for_columns(self.value_columns) - - # TODO(b/340896143): fix type error - expr, (index_col_ids, _, _) = unpivot( - expr, - row_labels=labels, - unpivot_columns=unpivot_columns, - ) - - return Block( - expr, - column_labels=self.column_labels, - index_columns=index_col_ids, - index_labels=self.column_labels.names, - ) - def explode( self, column_ids: typing.Sequence[str], diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 734b54609c..2e163fff7c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1208,7 +1208,107 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr else: frame = self._drop_non_numeric() - return DataFrame(frame._block.calculate_pairwise_metric(op=agg_ops.CorrOp())) + orig_columns = frame.columns + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + # A new column that uniquely identifies each row + block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") + + val_col_ids = [ + col_id for col_id in block.value_columns if col_id != ordering_col + ] + + block = block.melt( + [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" + ) + + block = block.merge( + block, + left_join_ids=[ordering_col], + right_join_ids=[ordering_col], + how="inner", + sort=False, + ) + + frame = DataFrame(block).dropna( + subset=["_bigframes_value_x", "_bigframes_value_y"] + ) + + paired_mean_frame = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( + column="_bigframes_value_x", aggfunc="mean" + ), + _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( + column="_bigframes_value_y", aggfunc="mean" + ), + ) + .reset_index() + ) + + frame = frame.merge( + paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] + ) + frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] + frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] + + frame["_bigframes_dividend"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_y"] + ) + frame["_bigframes_x_square"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_x"] + ) + frame["_bigframes_y_square"] = ( + frame["_bigframes_value_y"] * frame["_bigframes_value_y"] + ) + + result = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_dividend_sum=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="sum" + ), + _bigframes_x_square_sum=bigframes.pandas.NamedAgg( + column="_bigframes_x_square", aggfunc="sum" + ), + _bigframes_y_square_sum=bigframes.pandas.NamedAgg( + column="_bigframes_y_square", aggfunc="sum" + ), + ) + .reset_index() + ) + result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / ( + ( + result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"] + )._apply_unary_op(ops.sqrt_op) + ) + result = result._pivot( + index="_bigframes_variable_x", + columns="_bigframes_variable_y", + values="_bigframes_corr", + ) + + map_data = { + f"_bigframes_level_{i}": orig_columns.get_level_values(i) + for i in range(orig_columns.nlevels) + } + map_data["_bigframes_keys"] = range(len(orig_columns)) + map_df = bigframes.dataframe.DataFrame( + map_data, + session=self._get_block().expr.session, + ).set_index("_bigframes_keys") + result = result.join(map_df).sort_index() + index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] + result = result.set_index(index_columns) + result.index.names = orig_columns.names + result.columns = orig_columns + + return result def cov(self, *, numeric_only: bool = False) -> DataFrame: if not numeric_only: @@ -1216,7 +1316,95 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: else: frame = self._drop_non_numeric() - return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp())) + orig_columns = frame.columns + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + # A new column that uniquely identifies each row + block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") + + val_col_ids = [ + col_id for col_id in block.value_columns if col_id != ordering_col + ] + + block = block.melt( + [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" + ) + block = block.merge( + block, + left_join_ids=[ordering_col], + right_join_ids=[ordering_col], + how="inner", + sort=False, + ) + + frame = DataFrame(block).dropna( + subset=["_bigframes_value_x", "_bigframes_value_y"] + ) + + paired_mean_frame = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( + column="_bigframes_value_x", aggfunc="mean" + ), + _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( + column="_bigframes_value_y", aggfunc="mean" + ), + ) + .reset_index() + ) + + frame = frame.merge( + paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] + ) + frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] + frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] + + frame["_bigframes_dividend"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_y"] + ) + + result = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_dividend_sum=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="sum" + ), + _bigframes_dividend_count=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="count" + ), + ) + .reset_index() + ) + result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / ( + result["_bigframes_dividend_count"] - 1 + ) + result = result._pivot( + index="_bigframes_variable_x", + columns="_bigframes_variable_y", + values="_bigframes_cov", + ) + + map_data = { + f"_bigframes_level_{i}": orig_columns.get_level_values(i) + for i in range(orig_columns.nlevels) + } + map_data["_bigframes_keys"] = range(len(orig_columns)) + map_df = bigframes.dataframe.DataFrame( + map_data, + session=self._get_block().expr.session, + ).set_index("_bigframes_keys") + result = result.join(map_df).sort_index() + index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] + result = result.set_index(index_columns) + result.index.names = orig_columns.names + result.columns = orig_columns + + return result def to_arrow( self, diff --git a/tests/system/conftest.py b/tests/system/conftest.py index fd6d7a80b0..e1cbf02780 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -611,6 +611,28 @@ def scalars_dfs_maybe_ordered( ) +@pytest.fixture(scope="session") +def scalars_df_numeric_150_columns_maybe_ordered( + maybe_ordered_session, + scalars_pandas_df_index, +): + """DataFrame pointing at test data.""" + # TODO(b/379911038): After the error fixed, add numeric type. + pandas_df = scalars_pandas_df_index.reset_index(drop=False)[ + [ + "rowindex", + "rowindex_2", + "float64_col", + "int64_col", + "int64_too", + ] + * 30 + ] + + df = maybe_ordered_session.read_pandas(pandas_df) + return (df, pandas_df) + + @pytest.fixture(scope="session") def hockey_df( hockey_table_id: str, session: bigframes.Session diff --git a/tests/system/large/test_dataframe.py b/tests/system/large/test_dataframe.py new file mode 100644 index 0000000000..20d383463a --- /dev/null +++ b/tests/system/large/test_dataframe.py @@ -0,0 +1,42 @@ +import sys + +import pandas as pd +import pytest + + +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) +def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered + bf_result = scalars_df.corr(numeric_only=True).to_pandas() + pd_result = scalars_pandas_df.corr(numeric_only=True) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_column_type=False, + ) + + +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) +def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered + bf_result = scalars_df.cov(numeric_only=True).to_pandas() + pd_result = scalars_pandas_df.cov(numeric_only=True) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_column_type=False, + ) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index c9836756eb..bbcec90ea8 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2075,8 +2075,8 @@ def test_combine_first( ), ], ) -def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only): - scalars_df, scalars_pandas_df = scalars_dfs +def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas() pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only) @@ -2115,11 +2115,10 @@ def test_corr_w_invalid_parameters(scalars_dfs): ), ], ) -def test_cov_w_numeric_only(scalars_dfs, columns, numeric_only): - scalars_df, scalars_pandas_df = scalars_dfs +def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas() pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only) - # BigFrames and Pandas differ in their data type handling: # - Column types: BigFrames uses Float64, Pandas uses float64. # - Index types: BigFrames uses strign, Pandas uses object. diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index cab74f617d..1c78ac63d9 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -910,7 +910,9 @@ def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index): def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "float64_col", "int64_col"] - multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + multi_columns = pandas.MultiIndex.from_tuples( + zip(["a", "b", "b"], [1, 2, 2]), names=[None, "level_2"] + ) bf = scalars_df_index[columns].copy() bf.columns = multi_columns @@ -931,7 +933,9 @@ def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): def test_cov_w_multi_index(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "float64_col", "int64_col"] - multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + multi_columns = pandas.MultiIndex.from_tuples( + zip(["a", "b", "b"], [1, 2, 2]), names=["level_1", None] + ) bf = scalars_df_index[columns].copy() bf.columns = multi_columns From 585731ee5cb0ddad9c4b2c2b8884c3962a3da9ce Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 9 Dec 2024 11:43:16 -0800 Subject: [PATCH 7/9] refactor: consolidate reshaping functions under the reshape package (#1194) * consolidate reshaping functions under the reshape package * format code * fix import * fix import * lint code --- bigframes/core/reshape/__init__.py | 187 ------------------ .../{joins/__init__.py => reshape/api.py} | 12 +- bigframes/core/reshape/concat.py | 106 ++++++++++ bigframes/core/{joins => reshape}/merge.py | 5 + bigframes/core/reshape/tile.py | 129 ++++++++++++ bigframes/dataframe.py | 21 +- bigframes/operations/semantics.py | 2 +- bigframes/pandas/__init__.py | 128 +----------- bigframes/pandas/io/api.py | 2 - 9 files changed, 260 insertions(+), 332 deletions(-) rename bigframes/core/{joins/__init__.py => reshape/api.py} (71%) create mode 100644 bigframes/core/reshape/concat.py rename bigframes/core/{joins => reshape}/merge.py (94%) create mode 100644 bigframes/core/reshape/tile.py diff --git a/bigframes/core/reshape/__init__.py b/bigframes/core/reshape/__init__.py index 49ecedcc87..1dc90d1848 100644 --- a/bigframes/core/reshape/__init__.py +++ b/bigframes/core/reshape/__init__.py @@ -11,190 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - -import typing -from typing import Iterable, Literal, Optional, Union - -import bigframes_vendored.constants as constants -import pandas as pd - -import bigframes.core.expression as ex -import bigframes.core.ordering as order -import bigframes.core.utils as utils -import bigframes.core.window_spec as window_specs -import bigframes.dataframe -import bigframes.operations as ops -import bigframes.operations.aggregations as agg_ops -import bigframes.series - - -@typing.overload -def concat( - objs: Iterable[bigframes.series.Series], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.series.Series: - ... - - -@typing.overload -def concat( - objs: Iterable[bigframes.dataframe.DataFrame], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Literal["columns", 1], - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis=..., - join=..., - ignore_index=..., -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - ... - - -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Union[str, int] = 0, - join: Literal["inner", "outer"] = "outer", - ignore_index: bool = False, -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - axis_n = utils.get_axis_number(axis) - if axis_n == 0: - contains_dataframes = any( - isinstance(x, bigframes.dataframe.DataFrame) for x in objs - ) - if not contains_dataframes: - # Special case, all series, so align everything into single column even if labels don't match - series = typing.cast(typing.Iterable[bigframes.series.Series], objs) - names = {s.name for s in series} - # For series case, labels are stripped if they don't all match - if len(names) > 1: - blocks = [s._block.with_column_labels([None]) for s in series] - else: - blocks = [s._block for s in series] - block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) - return bigframes.series.Series(block) - blocks = [obj._block for obj in objs] - block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) - return bigframes.dataframe.DataFrame(block) - else: - # Note: does not validate inputs - block_list = [obj._block for obj in objs] - block = block_list[0] - for rblock in block_list[1:]: - block, _ = block.join(rblock, how=join) - return bigframes.dataframe.DataFrame(block) - - -def cut( - x: bigframes.series.Series, - bins: Union[ - int, - pd.IntervalIndex, - Iterable, - ], - *, - labels: Union[Iterable[str], bool, None] = None, -) -> bigframes.series.Series: - if isinstance(bins, int) and bins <= 0: - raise ValueError("`bins` should be a positive integer.") - - if isinstance(bins, Iterable): - if isinstance(bins, pd.IntervalIndex): - as_index: pd.IntervalIndex = bins - bins = tuple((bin.left.item(), bin.right.item()) for bin in bins) - elif len(list(bins)) == 0: - raise ValueError("`bins` iterable should have at least one item") - elif isinstance(list(bins)[0], tuple): - as_index = pd.IntervalIndex.from_tuples(list(bins)) - bins = tuple(bins) - elif pd.api.types.is_number(list(bins)[0]): - bins_list = list(bins) - if len(bins_list) < 2: - raise ValueError( - "`bins` iterable of numeric breaks should have" - " at least two items" - ) - as_index = pd.IntervalIndex.from_breaks(bins_list) - single_type = all([isinstance(n, type(bins_list[0])) for n in bins_list]) - numeric_type = type(bins_list[0]) if single_type else float - bins = tuple( - [ - (numeric_type(bins_list[i]), numeric_type(bins_list[i + 1])) - for i in range(len(bins_list) - 1) - ] - ) - else: - raise ValueError("`bins` iterable should contain tuples or numerics") - - if as_index.is_overlapping: - raise ValueError("Overlapping IntervalIndex is not accepted.") - - if labels is not None and labels is not False: - raise NotImplementedError( - "The 'labels' parameter must be either False or None. " - "Please provide a valid value for 'labels'." - ) - - return x._apply_window_op( - agg_ops.CutOp(bins, labels=labels), window_spec=window_specs.unbound() - ) - - -def qcut( - x: bigframes.series.Series, - q: typing.Union[int, typing.Sequence[float]], - *, - labels: Optional[bool] = None, - duplicates: typing.Literal["drop", "error"] = "error", -) -> bigframes.series.Series: - if isinstance(q, int) and q <= 0: - raise ValueError("`q` should be a positive integer.") - if utils.is_list_like(q): - q = tuple(q) - - if labels is not False: - raise NotImplementedError( - f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" - ) - if duplicates != "drop": - raise NotImplementedError( - f"Only duplicates='drop' is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" - ) - block = x._block - label = block.col_id_to_label[x._value_column] - block, nullity_id = block.apply_unary_op(x._value_column, ops.notnull_op) - block, result = block.apply_window_op( - x._value_column, - agg_ops.QcutOp(q), # type: ignore - window_spec=window_specs.unbound( - grouping_keys=(nullity_id,), - ordering=(order.ascending_over(x._value_column),), - ), - ) - block, result = block.project_expr( - ops.where_op.as_expr(result, nullity_id, ex.const(None)), label=label - ) - return bigframes.series.Series(block.select_column(result)) diff --git a/bigframes/core/joins/__init__.py b/bigframes/core/reshape/api.py similarity index 71% rename from bigframes/core/joins/__init__.py rename to bigframes/core/reshape/api.py index 3c5b9605a3..234ca4a2f9 100644 --- a/bigframes/core/joins/__init__.py +++ b/bigframes/core/reshape/api.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,10 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Helpers to join ArrayValue objects.""" +from bigframes.core.reshape.concat import concat +from bigframes.core.reshape.merge import merge +from bigframes.core.reshape.tile import cut, qcut -from bigframes.core.joins.merge import merge - -__all__ = [ - "merge", -] +__all__ = ["concat", "cut", "qcut", "merge"] diff --git a/bigframes/core/reshape/concat.py b/bigframes/core/reshape/concat.py new file mode 100644 index 0000000000..a42488cbe8 --- /dev/null +++ b/bigframes/core/reshape/concat.py @@ -0,0 +1,106 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import typing +from typing import Iterable, Literal, Union + +import bigframes_vendored.pandas.core.reshape.concat as vendored_pandas_concat + +import bigframes.core.utils as utils +import bigframes.dataframe +import bigframes.series + + +@typing.overload +def concat( + objs: Iterable[bigframes.series.Series], + *, + axis: typing.Literal["index", 0] = ..., + join=..., + ignore_index=..., +) -> bigframes.series.Series: + ... + + +@typing.overload +def concat( + objs: Iterable[bigframes.dataframe.DataFrame], + *, + axis: typing.Literal["index", 0] = ..., + join=..., + ignore_index=..., +) -> bigframes.dataframe.DataFrame: + ... + + +@typing.overload +def concat( + objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], + *, + axis: typing.Literal["columns", 1], + join=..., + ignore_index=..., +) -> bigframes.dataframe.DataFrame: + ... + + +@typing.overload +def concat( + objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], + *, + axis=..., + join=..., + ignore_index=..., +) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: + ... + + +def concat( + objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], + *, + axis: typing.Union[str, int] = 0, + join: Literal["inner", "outer"] = "outer", + ignore_index: bool = False, +) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: + axis_n = utils.get_axis_number(axis) + if axis_n == 0: + contains_dataframes = any( + isinstance(x, bigframes.dataframe.DataFrame) for x in objs + ) + if not contains_dataframes: + # Special case, all series, so align everything into single column even if labels don't match + series = typing.cast(typing.Iterable[bigframes.series.Series], objs) + names = {s.name for s in series} + # For series case, labels are stripped if they don't all match + if len(names) > 1: + blocks = [s._block.with_column_labels([None]) for s in series] + else: + blocks = [s._block for s in series] + block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) + return bigframes.series.Series(block) + blocks = [obj._block for obj in objs] + block = blocks[0].concat(blocks[1:], how=join, ignore_index=ignore_index) + return bigframes.dataframe.DataFrame(block) + else: + # Note: does not validate inputs + block_list = [obj._block for obj in objs] + block = block_list[0] + for rblock in block_list[1:]: + block, _ = block.join(rblock, how=join) + return bigframes.dataframe.DataFrame(block) + + +concat.__doc__ = vendored_pandas_concat.concat.__doc__ diff --git a/bigframes/core/joins/merge.py b/bigframes/core/reshape/merge.py similarity index 94% rename from bigframes/core/joins/merge.py rename to bigframes/core/reshape/merge.py index 1542cda0af..e1750d5c7a 100644 --- a/bigframes/core/joins/merge.py +++ b/bigframes/core/reshape/merge.py @@ -21,6 +21,8 @@ import typing from typing import Literal, Optional +import bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge + # Avoid cirular imports. if typing.TYPE_CHECKING: import bigframes.dataframe @@ -58,6 +60,9 @@ def merge( ) +merge.__doc__ = vendored_pandas_merge.merge.__doc__ + + def _validate_operand( obj: bigframes.dataframe.DataFrame | bigframes.series.Series, ) -> bigframes.dataframe.DataFrame: diff --git a/bigframes/core/reshape/tile.py b/bigframes/core/reshape/tile.py new file mode 100644 index 0000000000..2a2ca9de95 --- /dev/null +++ b/bigframes/core/reshape/tile.py @@ -0,0 +1,129 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import typing +from typing import Iterable, Optional, Union + +import bigframes_vendored.constants as constants +import bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile +import pandas as pd + +import bigframes.core.expression as ex +import bigframes.core.ordering as order +import bigframes.core.utils as utils +import bigframes.core.window_spec as window_specs +import bigframes.dataframe +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops +import bigframes.series + + +def cut( + x: bigframes.series.Series, + bins: Union[ + int, + pd.IntervalIndex, + Iterable, + ], + *, + labels: Union[Iterable[str], bool, None] = None, +) -> bigframes.series.Series: + if isinstance(bins, int) and bins <= 0: + raise ValueError("`bins` should be a positive integer.") + + if isinstance(bins, Iterable): + if isinstance(bins, pd.IntervalIndex): + as_index: pd.IntervalIndex = bins + bins = tuple((bin.left.item(), bin.right.item()) for bin in bins) + elif len(list(bins)) == 0: + raise ValueError("`bins` iterable should have at least one item") + elif isinstance(list(bins)[0], tuple): + as_index = pd.IntervalIndex.from_tuples(list(bins)) + bins = tuple(bins) + elif pd.api.types.is_number(list(bins)[0]): + bins_list = list(bins) + if len(bins_list) < 2: + raise ValueError( + "`bins` iterable of numeric breaks should have" + " at least two items" + ) + as_index = pd.IntervalIndex.from_breaks(bins_list) + single_type = all([isinstance(n, type(bins_list[0])) for n in bins_list]) + numeric_type = type(bins_list[0]) if single_type else float + bins = tuple( + [ + (numeric_type(bins_list[i]), numeric_type(bins_list[i + 1])) + for i in range(len(bins_list) - 1) + ] + ) + else: + raise ValueError("`bins` iterable should contain tuples or numerics") + + if as_index.is_overlapping: + raise ValueError("Overlapping IntervalIndex is not accepted.") + + if labels is not None and labels is not False: + raise NotImplementedError( + "The 'labels' parameter must be either False or None. " + "Please provide a valid value for 'labels'." + ) + + return x._apply_window_op( + agg_ops.CutOp(bins, labels=labels), window_spec=window_specs.unbound() + ) + + +cut.__doc__ = vendored_pandas_tile.cut.__doc__ + + +def qcut( + x: bigframes.series.Series, + q: typing.Union[int, typing.Sequence[float]], + *, + labels: Optional[bool] = None, + duplicates: typing.Literal["drop", "error"] = "error", +) -> bigframes.series.Series: + if isinstance(q, int) and q <= 0: + raise ValueError("`q` should be a positive integer.") + if utils.is_list_like(q): + q = tuple(q) + + if labels is not False: + raise NotImplementedError( + f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" + ) + if duplicates != "drop": + raise NotImplementedError( + f"Only duplicates='drop' is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" + ) + block = x._block + label = block.col_id_to_label[x._value_column] + block, nullity_id = block.apply_unary_op(x._value_column, ops.notnull_op) + block, result = block.apply_window_op( + x._value_column, + agg_ops.QcutOp(q), # type: ignore + window_spec=window_specs.unbound( + grouping_keys=(nullity_id,), + ordering=(order.ascending_over(x._value_column),), + ), + ) + block, result = block.project_expr( + ops.where_op.as_expr(result, nullity_id, ex.const(None)), label=label + ) + return bigframes.series.Series(block.select_column(result)) + + +qcut.__doc__ = vendored_pandas_tile.qcut.__doc__ diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2e163fff7c..cd52fef74b 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -77,7 +77,6 @@ import bigframes.operations.semantics import bigframes.operations.structs import bigframes.series -import bigframes.series as bf_series import bigframes.session._io.bigquery if typing.TYPE_CHECKING: @@ -141,9 +140,13 @@ def __init__( elif ( utils.is_dict_like(data) and len(data) >= 1 - and any(isinstance(data[key], bf_series.Series) for key in data.keys()) + and any( + isinstance(data[key], bigframes.series.Series) for key in data.keys() + ) ): - if not all(isinstance(data[key], bf_series.Series) for key in data.keys()): + if not all( + isinstance(data[key], bigframes.series.Series) for key in data.keys() + ): # TODO(tbergeron): Support local list/series data by converting to memtable. raise NotImplementedError( f"Cannot mix Series with other types. {constants.FEEDBACK_LINK}" @@ -151,13 +154,13 @@ def __init__( keys = list(data.keys()) first_label, first_series = keys[0], data[keys[0]] block = ( - typing.cast(bf_series.Series, first_series) + typing.cast(bigframes.series.Series, first_series) ._get_block() .with_column_labels([first_label]) ) for key in keys[1:]: - other = typing.cast(bf_series.Series, data[key]) + other = typing.cast(bigframes.series.Series, data[key]) other_block = other._block.with_column_labels([key]) # Pandas will keep original sorting if all indices are aligned. # We cannot detect this easily however, and so always sort on index @@ -1184,7 +1187,7 @@ def combine( results.append(result) if all([isinstance(val, bigframes.series.Series) for val in results]): - import bigframes.core.reshape as rs + import bigframes.core.reshape.api as rs return rs.concat(results, axis=1) else: @@ -2536,7 +2539,7 @@ def describe(self, include: None | Literal["all"] = None) -> DataFrame: elif len(non_numeric_result.columns) == 0: return numeric_result else: - import bigframes.core.reshape as rs + import bigframes.core.reshape.api as rs # Use reindex after join to preserve the original column order. return rs.concat( @@ -4025,7 +4028,7 @@ def _cached(self, *, force: bool = False) -> DataFrame: @validations.requires_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: - if not isinstance(other, (DataFrame, bf_series.Series)): + if not isinstance(other, (DataFrame, bigframes.series.Series)): raise NotImplementedError( f"Only DataFrame or Series operand is supported. {constants.FEEDBACK_LINK}" ) @@ -4100,7 +4103,7 @@ def get_right_id(id): ) result = result[other_frame.columns] - if isinstance(other, bf_series.Series): + if isinstance(other, bigframes.series.Series): # There should be exactly one column in the result result = result[result.columns[0]].rename() diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py index c605f30765..79b92afe4f 100644 --- a/bigframes/operations/semantics.py +++ b/bigframes/operations/semantics.py @@ -481,7 +481,7 @@ def map( )["ml_generate_text_llm_result"], ) - from bigframes.core.reshape import concat + from bigframes.core.reshape.api import concat return concat([self._df, results.rename(output_column)], axis=1) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 721736247c..33b41bb851 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -21,13 +21,10 @@ import inspect import sys import typing -from typing import Any, Iterable, List, Literal, Optional, Sequence, Tuple, Union +from typing import Any, List, Literal, Optional, Sequence, Tuple, Union import bigframes_vendored.constants as constants -import bigframes_vendored.pandas.core.reshape.concat as vendored_pandas_concat import bigframes_vendored.pandas.core.reshape.encoding as vendored_pandas_encoding -import bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge -import bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile import bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes import pandas @@ -36,8 +33,7 @@ import bigframes.core.expression as ex import bigframes.core.global_session as global_session import bigframes.core.indexes -import bigframes.core.joins -import bigframes.core.reshape +from bigframes.core.reshape.api import concat, cut, merge, qcut import bigframes.core.tools import bigframes.dataframe import bigframes.enums @@ -71,81 +67,6 @@ # Include method definition so that the method appears in our docs for # bigframes.pandas general functions. -@typing.overload -def concat( - objs: Iterable[bigframes.series.Series], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.series.Series: - ... - - -@typing.overload -def concat( - objs: Iterable[bigframes.dataframe.DataFrame], - *, - axis: typing.Literal["index", 0] = ..., - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Literal["columns", 1], - join=..., - ignore_index=..., -) -> bigframes.dataframe.DataFrame: - ... - - -@typing.overload -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis=..., - join=..., - ignore_index=..., -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - ... - - -def concat( - objs: Iterable[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]], - *, - axis: typing.Union[str, int] = 0, - join: Literal["inner", "outer"] = "outer", - ignore_index: bool = False, -) -> Union[bigframes.dataframe.DataFrame, bigframes.series.Series]: - return bigframes.core.reshape.concat( - objs=objs, axis=axis, join=join, ignore_index=ignore_index - ) - - -concat.__doc__ = vendored_pandas_concat.concat.__doc__ - - -def cut( - x: bigframes.series.Series, - bins: int, - *, - labels: Union[Iterable[str], bool, None] = None, -) -> bigframes.series.Series: - return bigframes.core.reshape.cut( - x, - bins, - labels=labels, - ) - - -cut.__doc__ = vendored_pandas_tile.cut.__doc__ - - def get_dummies( data: Union[DataFrame, Series], prefix: Union[List, dict, str, None] = None, @@ -318,51 +239,6 @@ def _perform_get_dummies_block_operations( return block, intermediate_col_ids -def qcut( - x: bigframes.series.Series, - q: int, - *, - labels: Optional[bool] = None, - duplicates: typing.Literal["drop", "error"] = "error", -) -> bigframes.series.Series: - return bigframes.core.reshape.qcut(x, q, labels=labels, duplicates=duplicates) - - -qcut.__doc__ = vendored_pandas_tile.qcut.__doc__ - - -def merge( - left: DataFrame, - right: DataFrame, - how: Literal[ - "inner", - "left", - "outer", - "right", - "cross", - ] = "inner", - on: Optional[str] = None, - *, - left_on: Optional[str] = None, - right_on: Optional[str] = None, - sort: bool = False, - suffixes: tuple[str, str] = ("_x", "_y"), -) -> DataFrame: - return bigframes.core.joins.merge( - left, - right, - how=how, - on=on, - left_on=left_on, - right_on=right_on, - sort=sort, - suffixes=suffixes, - ) - - -merge.__doc__ = vendored_pandas_merge.merge.__doc__ - - def remote_function( input_types: Union[None, type, Sequence[type]] = None, output_type: Optional[type] = None, diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 19ff5e0a05..c24651b43f 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -45,7 +45,6 @@ import bigframes.core.blocks import bigframes.core.global_session as global_session import bigframes.core.indexes -import bigframes.core.joins import bigframes.core.reshape import bigframes.core.tools import bigframes.dataframe @@ -54,7 +53,6 @@ import bigframes.session import bigframes.session._io.bigquery import bigframes.session.clients -import bigframes.version # Note: the following methods are duplicated from Session. This duplication # enables the following: From d0407dbadfea3ea26db3e2cd20e70e90ae4101d0 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Mon, 9 Dec 2024 12:22:53 -0800 Subject: [PATCH 8/9] feat: Series.isin supports bigframes.Series arg (#1195) --- bigframes/core/blocks.py | 37 ++++++++++++++++++++++++++++ bigframes/ml/utils.py | 2 +- bigframes/series.py | 3 ++- tests/system/small/test_series.py | 40 +++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index a9139dfee6..b548da2aa7 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2019,6 +2019,43 @@ def concat( result_block = result_block.reset_index() return result_block + def isin(self, other: Block): + # TODO: Support multiple other columns and match on label + # TODO: Model as explicit "IN" subquery/join to better allow db to optimize + assert len(other.value_columns) == 1 + unique_other_values = other.expr.select_columns( + [other.value_columns[0]] + ).aggregate((), by_column_ids=(other.value_columns[0],)) + block = self + # for each original column, join with other + for i in range(len(self.value_columns)): + block = block._isin_inner(block.value_columns[i], unique_other_values) + return block + + def _isin_inner(self: Block, col: str, unique_values: core.ArrayValue) -> Block: + unique_values, const = unique_values.create_constant( + True, dtype=bigframes.dtypes.BOOL_DTYPE + ) + expr, (l_map, r_map) = self._expr.relational_join( + unique_values, ((col, unique_values.column_ids[0]),), type="left" + ) + expr, matches = expr.project_to_id( + ops.eq_op.as_expr(ex.const(True), r_map[const]) + ) + + new_index_cols = tuple(l_map[idx_col] for idx_col in self.index_columns) + new_value_cols = tuple( + l_map[val_col] if val_col != col else matches + for val_col in self.value_columns + ) + expr = expr.select_columns((*new_index_cols, *new_value_cols)) + return Block( + expr, + index_columns=new_index_cols, + column_labels=self.column_labels, + index_labels=self._index_labels, + ) + def merge( self, other: Block, diff --git a/bigframes/ml/utils.py b/bigframes/ml/utils.py index 72d7054f45..e1620485d5 100644 --- a/bigframes/ml/utils.py +++ b/bigframes/ml/utils.py @@ -92,7 +92,7 @@ def _get_only_column(input: ArrayType) -> Union[pd.Series, bpd.Series]: label = typing.cast(Hashable, input.columns.tolist()[0]) if isinstance(input, pd.DataFrame): return typing.cast(pd.Series, input[label]) - return typing.cast(bpd.Series, input[label]) + return typing.cast(bpd.Series, input[label]) # type: ignore def parse_model_endpoint(model_endpoint: str) -> tuple[str, Optional[str]]: diff --git a/bigframes/series.py b/bigframes/series.py index b92da64aff..af09866bfe 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -718,12 +718,13 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series: ) def isin(self, values) -> "Series" | None: + if isinstance(values, (Series,)): + self._block.isin(values._block) if not _is_list_like(values): raise TypeError( "only list-like objects are allowed to be passed to " f"isin(), you passed a [{type(values).__name__}]" ) - return self._apply_unary_op( ops.IsInOp(values=tuple(values), match_nulls=True) ).fillna(value=False) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 218708a19d..692b221a19 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1200,6 +1200,46 @@ def test_isin(scalars_dfs, col_name, test_set): ) +@pytest.mark.parametrize( + ( + "col_name", + "test_set", + ), + [ + ( + "int64_col", + [314159, 2.0, 3, pd.NA], + ), + ( + "int64_col", + [2, 55555, 4], + ), + ( + "float64_col", + [-123.456, 1.25, pd.NA], + ), + ( + "int64_too", + [1, 2, pd.NA], + ), + ( + "string_col", + ["Hello, World!", "Hi", "こんにちは"], + ), + ], +) +def test_isin_bigframes_values(scalars_dfs, col_name, test_set, session): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = ( + scalars_df[col_name].isin(series.Series(test_set, session=session)).to_pandas() + ) + pd_result = scalars_pandas_df[col_name].isin(test_set).astype("boolean") + pd.testing.assert_series_equal( + pd_result, + bf_result, + ) + + def test_isnull(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs col_name = "float64_col" From 753f281569b866cb2a6be6e68459c184d1130b86 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 9 Dec 2024 21:57:29 +0000 Subject: [PATCH 9/9] fix import path --- bigframes/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index cd52fef74b..614a2fb919 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2579,7 +2579,7 @@ def _describe_numeric(self) -> DataFrame: elif len(temporal_df_result.columns) == 0: return number_df_result else: - import bigframes.core.reshape as rs + import bigframes.core.reshape.api as rs original_columns = self._select_exact_dtypes( bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE