From c972d07d9616498a7247e0b87fe9242c38d8a232 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 22 Jan 2025 20:31:05 +0000 Subject: [PATCH 1/2] feat: Add DataFrame.corrwith method --- bigframes/core/blocks.py | 2 +- bigframes/dataframe.py | 42 +++++++++++++++++++ tests/system/small/test_dataframe.py | 35 ++++++++++++++++ .../bigframes_vendored/pandas/core/frame.py | 41 ++++++++++++++++++ 4 files changed, 119 insertions(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index afc03dbdea..21a31bd5bd 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2161,7 +2161,7 @@ def _align_both_axes( columns, lcol_indexer, rcol_indexer = self.column_labels, None, None else: columns, lcol_indexer, rcol_indexer = self.column_labels.join( - other.column_labels, how="outer", return_indexers=True + other.column_labels, how=how, return_indexers=True ) lcol_indexer = ( lcol_indexer if (lcol_indexer is not None) else range(len(columns)) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index b2b22fbdbf..95e4b2945f 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1474,6 +1474,48 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: return result + def corrwith( + self, + other: typing.Union[DataFrame, bigframes.series.Series], + *, + numeric_only: bool = False, + ): + other_frame = other if isinstance(other, DataFrame) else other.to_frame() + if not numeric_only: + l_frame = self._raise_on_non_numeric("corrwith") + r_frame = other_frame._raise_on_non_numeric("corrwith") + else: + l_frame = self._drop_non_numeric() + r_frame = other_frame._drop_non_numeric() + + l_block = l_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block + r_block = r_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block + + if isinstance(other, DataFrame): + block, labels, expr_pairs = l_block._align_both_axes(r_block, how="inner") + else: + assert isinstance(other, bigframes.series.Series) + block, labels, expr_pairs = l_block._align_axis_0(r_block, how="inner") + + na_cols = l_block.column_labels.join( + r_block.column_labels, how="outer" + ).difference(labels) + + block, _ = block.aggregate( + aggregations=tuple( + ex.BinaryAggregation(agg_ops.CorrOp(), left_ex, right_ex) + for left_ex, right_ex in expr_pairs + ), + column_labels=labels, + ) + block = block.project_exprs( + (ex.const(float("nan")),) * len(na_cols), labels=na_cols + ) + block = block.transpose( + original_row_index=pandas.Index([None]), single_row_mode=True + ) + return bigframes.pandas.Series(block) + def to_arrow( self, *, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index e7d6ad67e1..2ee19d71cc 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2235,6 +2235,41 @@ def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): ) +def test_df_corrwith_df(scalars_dfs_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + + l_cols = ["int64_col", "float64_col", "int64_too"] + r_cols = ["int64_too", "float64_col"] + + bf_result = scalars_df[l_cols].corrwith(scalars_df[r_cols]).to_pandas() + pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_cols]) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +@skip_legacy_pandas +def test_df_corrwith_series(scalars_dfs_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + + l_cols = ["int64_col", "float64_col", "int64_too"] + r_col = "float64_col" + + bf_result = scalars_df[l_cols].corrwith(scalars_df[r_col]).to_pandas() + pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_col]) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("op"), [ diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index c8ca1b74b5..256ccc8993 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4054,6 +4054,47 @@ def cov(self, *, numeric_only) -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def corrwith( + self, + other, + *, + numeric_only: bool = False, + ): + """ + Compute pairwise correlation. + + Pairwise correlation is computed between rows or columns of + DataFrame with rows or columns of Series or DataFrame. DataFrames + are first aligned along both axes before computing the + correlations. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> index = ["a", "b", "c", "d", "e"] + >>> columns = ["one", "two", "three", "four"] + >>> df1 = bpd.DataFrame(np.arange(20).reshape(5, 4), index=index, columns=columns) + >>> df2 = bpd.DataFrame(np.arange(16).reshape(4, 4), index=index[:4], columns=columns) + >>> df1.corrwith(df2) + one 1.0 + two 1.0 + three 1.0 + four 1.0 + dtype: Float64 + + Args: + other (DataFrame, Series): + Object with which to compute correlations. + + numeric_only (bool, default False): + Include only `float`, `int` or `boolean` data. + + Returns: + bigframes.pandas.Series: Pairwise correlations. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def update( self, other, join: str = "left", overwrite: bool = True, filter_func=None ) -> DataFrame: From 8e9a058e6ce8c82d193fe57afe27616875d73ff7 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 24 Jan 2025 00:35:08 +0000 Subject: [PATCH 2/2] add tests and rearrange conditional --- bigframes/core/blocks.py | 14 ++++++------- bigframes/core/expression.py | 3 +++ bigframes/dataframe.py | 8 +++---- tests/system/small/test_dataframe.py | 31 ++++++++++++++++++++++++++++ 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 21a31bd5bd..727ee013f8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2152,7 +2152,7 @@ def merge( def _align_both_axes( self, other: Block, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]: # Join rows aligned_block, (get_column_left, get_column_right) = self.join(other, how=how) # join columns schema @@ -2183,11 +2183,11 @@ def _align_both_axes( left_inputs = [left_input_lookup(i) for i in lcol_indexer] right_inputs = [righ_input_lookup(i) for i in rcol_indexer] - return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) + return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore def _align_axis_0( self, other: Block, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.DerefOp, ex.DerefOp]]]: assert len(other.value_columns) == 1 aligned_block, (get_column_left, get_column_right) = self.join(other, how=how) @@ -2203,7 +2203,7 @@ def _align_axis_0( def _align_series_block_axis_1( self, other: Block, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]: assert len(other.value_columns) == 1 if other._transpose_cache is None: raise ValueError( @@ -2244,11 +2244,11 @@ def _align_series_block_axis_1( left_inputs = [left_input_lookup(i) for i in lcol_indexer] right_inputs = [righ_input_lookup(i) for i in rcol_indexer] - return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) + return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore def _align_pd_series_axis_1( self, other: pd.Series, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]: if self.column_labels.equals(other.index): columns, lcol_indexer, rcol_indexer = self.column_labels, None, None else: @@ -2275,7 +2275,7 @@ def _align_pd_series_axis_1( left_inputs = [left_input_lookup(i) for i in lcol_indexer] right_inputs = [righ_input_lookup(i) for i in rcol_indexer] - return self, columns, tuple(zip(left_inputs, right_inputs)) + return self, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore def _apply_binop( self, diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 2d561657cb..9173bebfc4 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -420,3 +420,6 @@ def deterministic(self) -> bool: return ( all(input.deterministic for input in self.inputs) and self.op.deterministic ) + + +RefOrConstant = Union[DerefOp, ScalarConstantExpression] diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 476cf24990..6c866ad4b5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1480,12 +1480,12 @@ def corrwith( numeric_only: bool = False, ): other_frame = other if isinstance(other, DataFrame) else other.to_frame() - if not numeric_only: - l_frame = self._raise_on_non_numeric("corrwith") - r_frame = other_frame._raise_on_non_numeric("corrwith") - else: + if numeric_only: l_frame = self._drop_non_numeric() r_frame = other_frame._drop_non_numeric() + else: + l_frame = self._raise_on_non_numeric("corrwith") + r_frame = other_frame._raise_on_non_numeric("corrwith") l_block = l_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block r_block = r_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 4e0126bec8..4266cdba88 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2263,6 +2263,37 @@ def test_df_corrwith_df(scalars_dfs_maybe_ordered): ) +def test_df_corrwith_df_numeric_only(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + l_cols = ["int64_col", "float64_col", "int64_too", "string_col"] + r_cols = ["int64_too", "float64_col", "bool_col"] + + bf_result = ( + scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=True).to_pandas() + ) + pd_result = scalars_pandas_df[l_cols].corrwith( + scalars_pandas_df[r_cols], numeric_only=True + ) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +def test_df_corrwith_df_non_numeric_error(scalars_dfs): + scalars_df, _ = scalars_dfs + + l_cols = ["int64_col", "float64_col", "int64_too", "string_col"] + r_cols = ["int64_too", "float64_col", "bool_col"] + + with pytest.raises(NotImplementedError): + scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=False) + + @skip_legacy_pandas def test_df_corrwith_series(scalars_dfs_maybe_ordered): scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered