10BC0 feat: Add DataFrame.corrwith method by TrevorBergeron · Pull Request #1315 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,7 @@ def merge(

def _align_both_axes(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]:
# Join rows
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)
# join columns schema
Expand All @@ -2161,7 +2161,7 @@ def _align_both_axes(
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.column_labels, how="outer", return_indexers=True
other.column_labels, how=how, return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
Expand All @@ -2183,11 +2183,11 @@ def _align_both_axes(

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))
return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore

def _align_axis_0(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.DerefOp, ex.DerefOp]]]:
assert len(other.value_columns) == 1
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)

Expand All @@ -2203,7 +2203,7 @@ def _align_axis_0(

def _align_series_block_axis_1(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]:
assert len(other.value_columns) == 1
if other._transpose_cache is None:
raise ValueError(
Expand Down Expand Up @@ -2244,11 +2244,11 @@ def _align_series_block_axis_1(

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))
return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore

def _align_pd_series_axis_1(
self, other: pd.Ser 10BC0 ies, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]:
if self.column_labels.equals(other.index):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
Expand All @@ -2275,7 +2275,7 @@ def _align_pd_series_axis_1(

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return self, columns, tuple(zip(left_inputs, right_inputs))
return self, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore

def _apply_binop(
self,
Expand Down
3 changes: 3 additions & 0 deletions bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,6 @@ def deterministic(self) -> bool:
return (
all(input.deterministic for input in self.inputs) and self.op.deterministic
)


RefOrConstant = Union[DerefOp, ScalarConstantExpression]
42 changes: 42 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,48 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame:

return result

def corrwith(
self,
other: typing.Union[DataFrame, bigframes.series.Series],
*,
numeric_only: bool = False,
):
other_frame = other if isinstance(other, DataFrame) else other.to_frame()
if numeric_only:
l_frame = self._drop_non_numeric()
r_frame = other_frame._drop_non_numeric()
else:
l_frame = self._raise_on_non_numeric("corrwith")
r_frame = other_frame._raise_on_non_numeric("corrwith")

l_block = l_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block
r_block = r_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block

if isinstance(other, DataFrame):
block, labels, expr_pairs = l_block._align_both_axes(r_block, how="inner")
else:
assert isinstance(other, bigframes.series.Series)
block, labels, expr_pairs = l_block._align_axis_0(r_block, how="inner")

na_cols = l_block.column_labels.join(
r_block.column_labels, how="outer"
).difference(labels)

block, _ = block.aggregate(
aggregations=tuple(
ex.BinaryAggregation(agg_ops.CorrOp(), left_ex, right_ex)
for left_ex, right_ex in expr_pairs
),
column_labels=labels,
)
block = block.project_exprs(
(ex.const(float("nan")),) * len(na_cols), labels=na_cols
)
block = block.transpose(
original_row_index=pandas.Index([None]), single_row_mode=True
)
return bigframes.pandas.Series(block)

def to_arrow(
self,
*,
Expand Down
66 changes: 66 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,72 @@ def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
)


def test_df_corrwith_df(scalars_dfs_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

l_cols = ["int64_col", "float64_col", "int64_too"]
r_cols = ["int64_too", "float64_col"]

bf_result = scalars_df[l_cols].corrwith(scalars_df[r_cols]).to_pandas()
pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_cols])

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_series_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should add two more cases:

numeric_only = True, two dfs contain only numeric columns => computation proceeds successfully
numeric_only = True, one of the dfs has a non-numeric column => an error is raised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


def test_df_corrwith_df_numeric_only(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

l_cols = ["int64_col", "float64_col", "int64_too", "string_col"]
r_cols = ["int64_too", "float64_col", "bool_col"]

bf_result = (
scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=True).to_pandas()
)
pd_result = scalars_pandas_df[l_cols].corrwith(
scalars_pandas_df[r_cols], numeric_only=True
)

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_series_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


def test_df_corrwith_df_non_numeric_error(scalars_dfs):
scalars_df, _ = scalars_dfs

l_cols = ["int64_col", "float64_col", "int64_too", "string_col"]
r_cols = ["int64_too", "float64_col", "bool_col"]

with pytest.raises(NotImplementedError):
scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=False)


@skip_legacy_pandas
def test_df_corrwith_series(scalars_dfs_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

l_cols = ["int64_col", "float64_col", "int64_too"]
r_col = "float64_col"

bf_result = scalars_df[l_cols].corrwith(scalars_df[r_col]).to_pandas()
pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_col])

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_series_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("op"),
[
Expand Down
41 changes: 41 additions & 0 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4146,6 +4146,47 @@ def cov(self, *, numeric_only) -> DataFrame:
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def corrwith(
self,
other,
*,
numeric_only: bool = False,
):
"""
Compute pairwise correlation.

Pairwise correlation is computed between rows or columns of
DataFrame with rows or columns of Series or DataFrame. DataFrames
are first aligned along both axes before computing the
correlations.

**Examples:**
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> index = ["a", "b", "c", "d", "e"]
>>> columns = ["one", "two", "three", "four"]
>>> df1 = bpd.DataFrame(np.arange(20).reshape(5, 4), index=index, columns=columns)
>>> df2 = bpd.DataFrame(np.arange(16).reshape(4, 4), index=index[:4], columns=columns)
>>> df1.corrwith(df2)
one 1.0
two 1.0
three 1.0
four 1.0
dtype: Float64

Args:
other (DataFrame, Series):
Object with which to compute correlations.

numeric_only (bool, default False):
Include only `float`, `int` or `boolean` data.

Returns:
bigframes.pandas.Series: Pairwise correlations.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def update(
self, other, join: str = "left", overwrite: bool = True, filter_func=None
) -> DataFrame:
Expand Down
Loading
0