From ad5789cfc65171bbd02a317ed53a4afe844e8779 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 9 May 2024 19:25:04 +0000 Subject: [PATCH 1/4] feat: add `Series.case_when()` feat: add `DataFrame.__delitem__` docs: add logistic regression samples --- bigframes/core/__init__.py | 17 ++- bigframes/core/blocks.py | 9 ++ bigframes/dataframe.py | 4 + bigframes/operations/__init__.py | 51 ++++--- bigframes/operations/base.py | 23 ++- bigframes/series.py | 19 +++ .../logistic_regression_prediction_test.py | 137 ++++++++++++++++++ tests/system/small/test_series.py | 24 +++ .../bigframes_vendored/pandas/core/series.py | 55 +++++++ 9 files changed, 305 insertions(+), 34 deletions(-) create mode 100644 samples/snippets/logistic_regression_prediction_test.py diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index eef0efcf83..79c6bb6495 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -16,6 +16,7 @@ from dataclasses import dataclass import functools import io +import itertools import typing from typing import Iterable, Sequence @@ -370,14 +371,16 @@ def unpivot( for col_id, input_ids in unpivot_columns: # row explode offset used to choose the input column # we use offset instead of label as labels are not necessarily unique - cases = tuple( - ( - ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), - ex.free_var(id_or_null) - if (id_or_null is not None) - else ex.const(None), + cases = itertools.chain( + *( + ( + ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), + ex.free_var(id_or_null) + if (id_or_null is not None) + else ex.const(None), + ) + for i, id_or_null in enumerate(input_ids) ) - for i, id_or_null in enumerate(input_ids) ) col_expr = ops.case_when_op.as_expr(*cases) unpivot_exprs.append((col_expr, col_id)) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 402581eb6f..277409f3a3 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -803,6 +803,15 @@ def apply_ternary_op( expr = op.as_expr(col_id_1, col_id_2, col_id_3) return self.project_expr(expr, result_label) + def apply_nary_op( + self, + columns: Iterable[str], + op: ops.NaryOp, + result_label: Label = None, + ) -> typing.Tuple[Block, str]: + expr = op.as_expr(*columns) + return self.project_expr(expr, result_label) + def multi_apply_window_op( self, columns: typing.Sequence[str], diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 1f1fb5467f..47730630e3 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -655,6 +655,10 @@ def _repr_html_(self) -> str: html_string += f"[{row_count} rows x {column_count} columns in total]" return html_string + def __delitem__(self, key: str): + df = self.drop(columns=[key]) + self._set_block(df._get_block()) + def __setitem__(self, key: str, value: SingleItemValue): df = self._assign_single_item(key, value) self._set_block(df._get_block()) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index a7c385a2b8..4fefbbf92c 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -17,7 +17,7 @@ import dataclasses import functools import typing -from typing import Tuple, Union +from typing import Union import numpy as np import pandas as pd @@ -46,7 +46,7 @@ def order_preserving(self) -> bool: @dataclasses.dataclass(frozen=True) -class NaryOp: +class AlignedOp: @property def name(self) -> str: raise NotImplementedError("RowOp abstract base class has no implementation") @@ -60,10 +60,30 @@ def order_preserving(self) -> bool: return False +@dataclasses.dataclass(frozen=True) +class NaryOp(AlignedOp): + def as_expr( + self, + *case_output_pairs: Union[str | bigframes.core.expression.Expression], + ) -> bigframes.core.expression.Expression: + import bigframes.core.expression + + # Keep this in sync with output_type and compilers + inputs: list[bigframes.core.expression.Expression] = [] + + for case_or_output in case_output_pairs: + inputs.append(_convert_expr_input(case_or_output)) + + return bigframes.core.expression.OpExpression( + self, + tuple(inputs), + ) + + # These classes can be used to create simple ops that don't take local parameters # All is needed is a unique name, and to register an implementation in ibis_mappings.py @dataclasses.dataclass(frozen=True) -class UnaryOp(NaryOp): +class UnaryOp(AlignedOp): @property def arguments(self) -> int: return 1 @@ -79,7 +99,7 @@ def as_expr( @dataclasses.dataclass(frozen=True) -class BinaryOp(NaryOp): +class BinaryOp(AlignedOp): @property def arguments(self) -> int: return 2 @@ -101,7 +121,7 @@ def as_expr( @dataclasses.dataclass(frozen=True) -class TernaryOp(NaryOp): +class TernaryOp(AlignedOp): @property def arguments(self) -> int: return 3 @@ -655,27 +675,6 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT output_expr_types, ) - def as_expr( - self, - *case_output_pairs: Tuple[ - Union[str | bigframes.core.expression.Expression], - Union[str | bigframes.core.expression.Expression], - ], - ) -> bigframes.core.expression.Expression: - import bigframes.core.expression - - # Keep this in sync with output_type and compilers - inputs: list[bigframes.core.expression.Expression] = [] - - for case, output in case_output_pairs: - inputs.append(_convert_expr_input(case)) - inputs.append(_convert_expr_input(output)) - - return bigframes.core.expression.OpExpression( - self, - tuple(inputs), - ) - case_when_op = CaseWhenOp() diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index b003ce59cc..75d14f3fbc 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -15,6 +15,7 @@ from __future__ import annotations import typing +from typing import List, Sequence import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing import numpy @@ -205,6 +206,21 @@ def _apply_binary_op( block, result_id = self._block.project_expr(expr, name) return series.Series(block.select_column(result_id)) + def _apply_nary_op( + self, + op: ops.NaryOp, + others: Sequence[typing.Union[series.Series, scalars.Scalar]], + ignore_self=False, + ): + """Applies an n-ary operator to the series and others.""" + values, block = self._align_n(others, ignore_self=ignore_self) + block, result_id = block.apply_nary_op( + values, + op, + self._name, + ) + return series.Series(block.select_column(result_id)) + def _apply_binary_aggregation( self, other: series.Series, stat: agg_ops.BinaryAggregateOp ) -> float: @@ -226,8 +242,13 @@ def _align_n( self, others: typing.Sequence[typing.Union[series.Series, scalars.Scalar]], how="outer", + ignore_self=False, ) -> tuple[typing.Sequence[str], blocks.Block]: - value_ids = [self._value_column] + if ignore_self: + value_ids: List[str] = [] + else: + value_ids = [self._value_column] + block = self._block for other in others: if isinstance(other, series.Series): diff --git a/bigframes/series.py b/bigframes/series.py index aea3d60ff5..ce13d205bd 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -410,6 +410,25 @@ def between(self, left, right, inclusive="both"): self._apply_binary_op(right, right_op) ) + def case_when(self, caselist) -> Series: + return self._apply_nary_op( + ops.case_when_op, + tuple( + itertools.chain( + itertools.chain(*caselist), + # Fallback to current value if no other matches. + ( + # We make a Series with a constant value to avoid casts to + # types other than boolean. + Series(True, index=self.index, dtype=pandas.BooleanDtype()), + self, + ), + ), + ), + # Self is already included in "others". + ignore_self=True, + ) + def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.WindowSpec(following=0) diff --git a/samples/snippets/logistic_regression_prediction_test.py b/samples/snippets/logistic_regression_prediction_test.py new file mode 100644 index 0000000000..d5e313c202 --- /dev/null +++ b/samples/snippets/logistic_regression_prediction_test.py @@ -0,0 +1,137 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""BigQuery DataFrames code samples for +https://cloud.google.com/bigquery/docs/logistic-regression-prediction. +""" + + +def test_logistic_regression_prediction(random_model_id): + your_model_id = random_model_id + + # [START bigquery_dataframes_logistic_regression_prediction_examine] + import bigframes.pandas as bpd + + df = bpd.read_gbq( + "bigquery-public-data.ml_datasets.census_adult_income", + columns=( + "age", + "workclass", + "marital_status", + "education_num", + "occupation", + "hours_per_week", + "income_bracket", + "functional_weight", + ), + max_results=100, + ) + df.peek() + # Output: + # age workclass marital_status education_num occupation hours_per_week income_bracket functional_weight + # 47 Local-gov Married-civ-spouse 13 Prof-specialty 40 >50K 198660 + # 56 Private Never-married 9 Adm-clerical 40 <=50K 85018 + # 40 Private Married-civ-spouse 12 Tech-support 40 >50K 285787 + # 34 Self-emp-inc Married-civ-spouse 9 Craft-repair 54 >50K 207668 + # 23 Private Married-civ-spouse 10 Handlers-cleaners 40 <=50K 40060 + # [END bigquery_dataframes_logistic_regression_prediction_examine] + + # [START bigquery_dataframes_logistic_regression_prediction_prepare] + import bigframes.pandas as bpd + + input_data = bpd.read_gbq( + "bigquery-public-data.ml_datasets.census_adult_income", + columns=( + "age", + "workclass", + "marital_status", + "education_num", + "occupation", + "hours_per_week", + "income_bracket", + "functional_weight", + ), + ) + input_data["dataframe"] = bpd.Series("training", index=input_data.index,).case_when( + [ + (((input_data["functional_weight"] % 10) == 8), "evaluation"), + (((input_data["functional_weight"] % 10) == 9), "prediction"), + ] + ) + del input_data["functional_weight"] + # [END bigquery_dataframes_logistic_regression_prediction_prepare] + + # [START bigquery_dataframes_logistic_regression_prediction_create_model] + import bigframes.ml.linear_model + + # input_data is defined in an earlier step. + training_data = input_data[input_data["dataframe"] == "training"] + X = training_data.drop(columns=["income_bracket", "dataframe"]) + y = training_data["income_bracket"] + + census_model = bigframes.ml.linear_model.LogisticRegression() + census_model.fit(X, y) + + census_model.to_gbq( + your_model_id, # For example: "your-project.census.census_model" + replace=True, + ) + # [END bigquery_dataframes_logistic_regression_prediction_create_model] + + # [START bigquery_dataframes_logistic_regression_prediction_evaluate_model] + # Select model you'll use for predictions. `read_gbq_model` loads model + # data from BigQuery, but you could also use the `census_model` object + # from previous steps. + census_model = bpd.read_gbq_model( + your_model_id, # For example: "your-project.census.census_model" + ) + + # input_data is defined in an earlier step. + evaluation_data = input_data[input_data["dataframe"] == "evaluation"] + X = evaluation_data.drop(columns=["income_bracket", "dataframe"]) + y = evaluation_data["income_bracket"] + + # The score() method evaluates how the model performs compared to the + # actual data. Output DataFrame matches that of ML.EVALUATE(). + score = census_model.score(X, y) + score.peek() + # Output: + # precision recall accuracy f1_score log_loss roc_auc + # 0 0.685764 0.536685 0.83819 0.602134 0.350417 0.882953 + # [END bigquery_dataframes_logistic_regression_prediction_evaluate_model] + + # [START bigquery_dataframes_logistic_regression_prediction_predict_income_bracket] + # Select model you'll use for predictions. `read_gbq_model` loads model + # data from BigQuery, but you could also use the `census_model` object + # from previous steps. + census_model = bpd.read_gbq_model( + your_model_id, # For example: "your-project.census.census_model" + ) + + # input_data is defined in an earlier step. + prediction_data = input_data[input_data["dataframe"] == "prediction"] + + predictions = census_model.predict(prediction_data) + predictions.peek() + # Output: + # predicted_income_bracket predicted_income_bracket_probs age workclass ... occupation hours_per_week income_bracket dataframe + # 18004 <=50K [{'label': ' >50K', 'prob': 0.0763305999358786... 75 ? ... ? 6 <=50K prediction + # 18886 <=50K [{'label': ' >50K', 'prob': 0.0448866871906495... 73 ? ... ? 22 >50K prediction + # 31024 <=50K [{'label': ' >50K', 'prob': 0.0362982319421936... 69 ? ... ? 1 <=50K prediction + # 31022 <=50K [{'label': ' >50K', 'prob': 0.0787836112058324... 75 ? ... ? 5 <=50K prediction + # 23295 <=50K [{'label': ' >50K', 'prob': 0.3385373037905673... 78 ? ... ? 32 <=50K prediction + # [END bigquery_dataframes_logistic_regression_prediction_predict_income_bracket] + + # TODO(tswast): Implement ML.EXPLAIN_PREDICT() and corresponding sample. + # TODO(tswast): Implement ML.GLOBAL_EXPLAIN() and corresponding sample. diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 38aed19f05..c6dd798171 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2565,6 +2565,30 @@ def test_between(scalars_df_index, scalars_pandas_df_index, left, right, inclusi ) +def test_case_when(scalars_df_index, scalars_pandas_df_index): + bf_series = scalars_df_index["int64_col"] + pd_series = scalars_pandas_df_index["int64_col"] + + # TODO(tswast): pandas case_when appears to assume True when a value is + # null. I suspect this should be considered a bug in pandas. + bf_result = bf_series.case_when( + [ + ((bf_series > 100).fillna(True), 1000), + ((bf_series < -100).fillna(True), -1000), + ] + ).to_pandas() + pd_result = pd_series.case_when( + [ + (pd_series > 100, 1000), + (pd_series < -100, -1000), + ] + ) + pd.testing.assert_series_equal( + bf_result, + pd_result.astype(pd.Int64Dtype()), + ) + + def test_to_frame(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 4833c41ff7..e155fb073a 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -6,10 +6,12 @@ from typing import ( Hashable, IO, + List, Literal, Mapping, Optional, Sequence, + Tuple, TYPE_CHECKING, Union, ) @@ -1937,6 +1939,59 @@ def between( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def case_when( + self, + caselist: List[Tuple[Series, Series]], + ) -> Series: + """Replace values where the conditions are True. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> c = bpd.Series([6, 7, 8, 9], name="c") + >>> a = bpd.Series([0, 0, 1, 2]) + >>> b = bpd.Series([0, 3, 4, 5]) + + >>> c.case_when( + ... caselist=[ + ... (a.gt(0), a), # condition, replacement + ... (b.gt(0), b), + ... ] + ... ) + 0 6 + 1 3 + 2 1 + 3 2 + Name: c, dtype: Int64 + + **See also:** + + - :func:`bigframes.series.Series.mask` : Replace values where the condition is True. + + Args: + caselist: + A list of tuples of conditions and expected replacements + Takes the form: ``(condition0, replacement0)``, + ``(condition1, replacement1)``, ... . + ``condition`` should be a 1-D boolean array-like object + or a callable. If ``condition`` is a callable, + it is computed on the Series + and should return a boolean Series or array. + The callable must not change the input Series + (though pandas doesn`t check it). ``replacement`` should be a + 1-D array-like object, a scalar or a callable. + If ``replacement`` is a callable, it is computed on the Series + and should return a scalar or Series. The callable + must not change the input Series + (though pandas doesn`t check it). + + Returns: + bigframes.series.Series + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def cumprod(self): """ Return cumulative product over a DataFrame or Series axis. From 0344ad382f04db44f74730604b04879c0a0ac89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Thu, 9 May 2024 18:43:07 -0500 Subject: [PATCH 2/4] rename to ScalarOp --- bigframes/operations/__init__.py | 10 +++++----- tests/system/small/test_series.py | 6 ++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 4fefbbf92c..b85af06baa 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -46,7 +46,7 @@ def order_preserving(self) -> bool: @dataclasses.dataclass(frozen=True) -class AlignedOp: +class ScalarOp: @property def name(self) -> str: raise NotImplementedError("RowOp abstract base class has no implementation") @@ -61,7 +61,7 @@ def order_preserving(self) -> bool: @dataclasses.dataclass(frozen=True) -class NaryOp(AlignedOp): +class NaryOp(ScalarOp): def as_expr( self, *case_output_pairs: Union[str | bigframes.core.expression.Expression], @@ -83,7 +83,7 @@ def as_expr( # These classes can be used to create simple ops that don't take local parameters # All is needed is a unique name, and to register an implementation in ibis_mappings.py @dataclasses.dataclass(frozen=True) -class UnaryOp(AlignedOp): +class UnaryOp(ScalarOp): @property def arguments(self) -> int: return 1 @@ -99,7 +99,7 @@ def as_expr( @dataclasses.dataclass(frozen=True) -class BinaryOp(AlignedOp): +class BinaryOp(ScalarOp): @property def arguments(self) -> int: return 2 @@ -121,7 +121,7 @@ def as_expr( @dataclasses.dataclass(frozen=True) -class TernaryOp(AlignedOp): +class TernaryOp(ScalarOp): @property def arguments(self) -> int: return 3 diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index c6dd798171..beb99b1ada 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2566,6 +2566,12 @@ def test_between(scalars_df_index, scalars_pandas_df_index, left, right, inclusi def test_case_when(scalars_df_index, scalars_pandas_df_index): + pytest.importorskip( + "pandas", + minversion="2.2.0", + reason="case_when added in pandas 2.2.0", + ) + bf_series = scalars_df_index["int64_col"] pd_series = scalars_pandas_df_index["int64_col"] From 4de1cab2e8786fa066aab781344027d86131e904 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Thu, 9 May 2024 18:45:51 -0500 Subject: [PATCH 3/4] rename to exprs --- bigframes/operations/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index b85af06baa..e52f488d38 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -64,15 +64,15 @@ def order_preserving(self) -> bool: class NaryOp(ScalarOp): def as_expr( self, - *case_output_pairs: Union[str | bigframes.core.expression.Expression], + *exprs: Union[str | bigframes.core.expression.Expression], ) -> bigframes.core.expression.Expression: import bigframes.core.expression # Keep this in sync with output_type and compilers inputs: list[bigframes.core.expression.Expression] = [] - for case_or_output in case_output_pairs: - inputs.append(_convert_expr_input(case_or_output)) + for expr in exprs: + inputs.append(_convert_expr_input(expr)) return bigframes.core.expression.OpExpression( self, From 236b1ab64a04b895fd03bc879a3b8d02cc9809d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Thu, 9 May 2024 18:52:00 -0500 Subject: [PATCH 4/4] add type annotations --- samples/snippets/logistic_regression_prediction_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/logistic_regression_prediction_test.py b/samples/snippets/logistic_regression_prediction_test.py index d5e313c202..6a40369ba8 100644 --- a/samples/snippets/logistic_regression_prediction_test.py +++ b/samples/snippets/logistic_regression_prediction_test.py @@ -17,7 +17,7 @@ """ -def test_logistic_regression_prediction(random_model_id): +def test_logistic_regression_prediction(random_model_id: str) -> None: your_model_id = random_model_id # [START bigquery_dataframes_logistic_regression_prediction_examine]