From 735da4af8104ee664ab0e1d4ab40cedbf50fa4eb Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 23 Sep 2023 01:14:24 +0000 Subject: [PATCH 1/3] feat: add update and align methods to dataframe --- bigframes/clients.py | 2 +- bigframes/core/block_transforms.py | 65 +++++++++++ bigframes/dataframe.py | 108 +++++++++++------- tests/system/small/test_dataframe.py | 74 +++++++++++- .../bigframes_vendored/pandas/core/frame.py | 62 ++++++++++ 5 files changed, 263 insertions(+), 48 deletions(-) diff --git a/bigframes/clients.py b/bigframes/clients.py index b60fcba04a..10deb7ec0a 100644 --- a/bigframes/clients.py +++ b/bigframes/clients.py @@ -83,7 +83,7 @@ def create_bq_connection( service_account_id = cast(str, service_account_id) # Ensure IAM role on the BQ connection # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function - self._ensure_iam_binding(project_id, service_account_id, iam_role) + self._ensure_iam_binding(project_id, service_account_id, iam_role) # type: ignore # Introduce retries to accommodate transient errors like etag mismatch, # which can be caused by concurrent operation on the same resource, and diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index da6ba65b8a..6126a566b4 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -500,3 +500,68 @@ def _kurt_from_moments_and_count( kurt_id, na_cond_id, ops.partial_arg3(ops.where_op, None) ) return block, kurt_id + + +def align( + left_block: blocks.Block, + right_block: blocks.Block, + join: str = "outer", + axis: typing.Union[str, int, None] = None, +) -> typing.Tuple[blocks.Block, blocks.Block]: + axis_n = core.utils.get_axis_number(axis) if axis is not None else None + if (axis_n == 0) or (axis_n is None): + # Align rows + joined_index, (get_column_left, get_column_right) = left_block.index.join( + right_block.index, how=join + ) + left_block_merged = joined_index._block + right_block_merged = joined_index._block + # Syncing the blocks ensures that the resulting blocks can utilize row identity join + sync_blocks = True + else: + left_block_merged = left_block + right_block_merged = right_block + get_column_left = get_column_right = lambda x: x + sync_blocks = False + + if (axis_n == 1) or (axis_n is None): + # align columns + columns, lcol_indexer, rcol_indexer = left_block.column_labels.join( + right_block.column_labels, how=join, return_indexers=True + ) + column_indices = zip( + lcol_indexer if (lcol_indexer is not None) else range(len(columns)), + rcol_indexer if (rcol_indexer is not None) else range(len(columns)), + ) + left_column_ids = [] + right_column_ids = [] + + for left_index, right_index in column_indices: + if left_index >= 0: + left_col_id = get_column_left(left_block.value_columns[left_index]) + else: + dtype = right_block.dtypes[right_index] + left_block_merged, left_col_id = left_block_merged.create_constant( + None, dtype=dtype, label=right_block.column_labels[right_index] + ) + if sync_blocks: + right_block_merged = left_block_merged + left_column_ids.append(left_col_id) + + if right_index >= 0: + right_col_id = get_column_right(right_block.value_columns[right_index]) + else: + dtype = left_block.dtypes[left_index] + right_block_merged, right_col_id = right_block_merged.create_constant( + None, dtype=dtype, label=left_block.column_labels[left_index] + ) + if sync_blocks: + left_block_merged = right_block_merged + right_column_ids.append(right_col_id) + else: + left_column_ids = [get_column_left(col) for col in left_block.value_columns] + right_column_ids = [get_column_right(col) for col in right_block.value_columns] + + left_final = left_block_merged.select_columns(left_column_ids) + right_final = right_block_merged.select_columns(right_column_ids) + return left_final, right_final diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e4e22e0306..fd847bdc60 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -745,6 +745,55 @@ def rpow( __rpow__ = rpow + def align( + self, + other: typing.Union[DataFrame, bigframes.series.Series], + join: str = "outer", + axis: typing.Union[str, int, None] = None, + ) -> typing.Tuple[ + typing.Union[DataFrame, bigframes.series.Series], + typing.Union[DataFrame, bigframes.series.Series], + ]: + axis_n = utils.get_axis_number(axis) if axis else None + if axis_n == 1 and isinstance(other, bigframes.series.Series): + raise NotImplementedError( + f"align with series and axis=1 not supported. {constants.FEEDBACK_LINK}" + ) + left_block, right_block = block_ops.align( + self._block, other._block, join=join, axis=axis + ) + return DataFrame(left_block), other.__class__(right_block) + + def update(self, other, join: str = "left", overwrite=True, filter_func=None): + other = other if isinstance(other, DataFrame) else DataFrame(other) + if join != "left": + raise ValueError("Only 'left' join supported for update") + + if filter_func is not None: # Will always take other if possible + + def update_func( + left: bigframes.series.Series, right: bigframes.series.Series + ) -> bigframes.series.Series: + return left.mask(right.notna() & filter_func(left), right) + + elif overwrite: + + def update_func( + left: bigframes.series.Series, right: bigframes.series.Series + ) -> bigframes.series.Series: + return left.mask(right.notna(), right) + + else: + + def update_func( + left: bigframes.series.Series, right: bigframes.series.Series + ) -> bigframes.series.Series: + return left.mask(left.isna(), right) + + result = self.combine(other, update_func, how=join) + + self._set_block(result._block) + def combine( self, other: DataFrame, @@ -753,56 +802,31 @@ def combine( ], fill_value=None, overwrite: bool = True, + *, + how: str = "outer", ) -> DataFrame: - # Join rows - joined_index, (get_column_left, get_column_right) = self._block.index.join( - other._block.index, how="outer" - ) - columns, lcol_indexer, rcol_indexer = self.columns.join( - other.columns, how="outer", return_indexers=True - ) + l_aligned, r_aligned = block_ops.align(self._block, other._block, join=how) - column_indices = zip( - lcol_indexer if (lcol_indexer is not None) else range(len(columns)), - rcol_indexer if (lcol_indexer is not None) else range(len(columns)), + other_missing_labels = self._block.column_labels.difference( + other._block.column_labels ) - block = joined_index._block + l_frame = DataFrame(l_aligned) + r_frame = DataFrame(r_aligned) results = [] - for left_index, right_index in column_indices: - if left_index >= 0 and right_index >= 0: # -1 indices indicate missing - left_col_id = get_column_left(self._block.value_columns[left_index]) - right_col_id = get_column_right(other._block.value_columns[right_index]) - left_series = bigframes.series.Series(block.select_column(left_col_id)) - right_series = bigframes.series.Series( - block.select_column(right_col_id) - ) + for (label, lseries), (_, rseries) in zip(l_frame.items(), r_frame.items()): + if not ((label in other_missing_labels) and not overwrite): if fill_value is not None: - left_series = left_series.fillna(fill_value) - right_series = right_series.fillna(fill_value) - results.append(func(left_series, right_series)) - elif left_index >= 0: - # Does not exist in other - if overwrite: - dtype = self.dtypes[left_index] - block, null_col_id = block.create_constant(None, dtype=dtype) - result = bigframes.series.Series(block.select_column(null_col_id)) - results.append(result) + result = func( + lseries.fillna(fill_value), rseries.fillna(fill_value) + ) else: - left_col_id = get_column_left(self._block.value_columns[left_index]) - result = bigframes.series.Series(block.select_column(left_col_id)) - if fill_value is not None: - result = result.fillna(fill_value) - results.append(result) - elif right_index >= 0: - right_col_id = get_column_right(other._block.value_columns[right_index]) - result = bigframes.series.Series(block.select_column(right_col_id)) - if fill_value is not None: - result = result.fillna(fill_value) - results.append(result) + result = func(lseries, rseries) else: - # Should not be possible - raise ValueError("No right or left index.") + result = ( + lseries.fillna(fill_value) if fill_value is not None else lseries + ) + results.append(result) if all([isinstance(val, bigframes.series.Series) for val in results]): import bigframes.core.reshape as rs diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index adf17848ee..d562b4dc7c 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1211,6 +1211,75 @@ def test_combine( pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) +@pytest.mark.parametrize( + ("overwrite", "filter_func"), + [ + (True, None), + (False, None), + (True, lambda x: x.isna() | (x % 2 == 0)), + ], + ids=[ + "default", + "overwritefalse", + "customfilter", + ], +) +def test_df_update(overwrite, filter_func): + index1 = pandas.Index([1, 2, 3, 4], dtype="Int64") + index2 = pandas.Index([1, 2, 4, 5], dtype="Int64") + pd_df1 = pandas.DataFrame( + {"a": [1, None, 3, 4], "b": [5, 6, None, 8]}, dtype="Int64", index=index1 + ) + pd_df2 = pandas.DataFrame( + {"a": [None, 20, 30, 40], "c": [90, None, 110, 120]}, + dtype="Int64", + index=index2, + ) + + bf_df1 = dataframe.DataFrame(pd_df1) + bf_df2 = dataframe.DataFrame(pd_df2) + + bf_df1.update(bf_df2, overwrite=overwrite, filter_func=filter_func) + pd_df1.update(pd_df2, overwrite=overwrite, filter_func=filter_func) + + pd.testing.assert_frame_equal(bf_df1.to_pandas(), pd_df1) + + +@pytest.mark.parametrize( + ("join", "axis"), + [ + ("outer", None), + ("outer", 0), + ("outer", 1), + ("left", 0), + ("right", 1), + ("inner", None), + ("inner", 1), + ], +) +def test_df_align(join, axis): + index1 = pandas.Index([1, 2, 3, 4], dtype="Int64") + index2 = pandas.Index([1, 2, 4, 5], dtype="Int64") + pd_df1 = pandas.DataFrame( + {"a": [1, None, 3, 4], "b": [5, 6, None, 8]}, dtype="Int64", index=index1 + ) + pd_df2 = pandas.DataFrame( + {"a": [None, 20, 30, 40], "c": [90, None, 110, 120]}, + dtype="Int64", + index=index2, + ) + + bf_df1 = dataframe.DataFrame(pd_df1) + bf_df2 = dataframe.DataFrame(pd_df2) + + bf_result1, bf_result2 = bf_df1.align(bf_df2, join=join, axis=axis) + pd_result1, pd_result2 = pd_df1.align(pd_df2, join=join, axis=axis) + + # Don't check dtype as pandas does unnecessary float conversion + pd.testing.assert_frame_equal(bf_result1.to_pandas(), pd_result1, check_dtype=False) + pd.testing.assert_frame_equal(bf_result2.to_pandas(), pd_result2, check_dtype=False) + + def test_combine_first( scalars_df_index, scalars_df_2_index, @@ -1232,11 +1301,6 @@ def test_combine_first( pd_df_b.columns = ["b", "a", "d"] pd_result = pd_df_a.combine_first(pd_df_b) - print("pandas") - print(pd_result.to_string()) - print("bigframes") - print(bf_result.to_string()) - # Some dtype inconsistency for all-NULL columns pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 6ce11cd7e9..5cd9fe5163 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -503,6 +503,35 @@ def drop( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def align( + self, + other, + join="outer", + axis=None, + ) -> tuple: + """ + Align two objects on their axes with the specified join method. + + Join method is specified for each axis Index. + + Args: + other (DataFrame or Series): + join ({{'outer', 'inner', 'left', 'right'}}, default 'outer'): + Type of alignment to be performed. + left: use only keys from left frame, preserve key order. + right: use only keys from right frame, preserve key order. + outer: use union of keys from both frames, sort keys lexicographically. + inner: use intersection of keys from both frames, + preserve the order of the left keys. + + axis (allowed axis of the other object, default None): + Align on index (0), columns (1), or both (None). + + Returns: + tuple of (DataFrame, type of other): Aligned objects. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def rename( self, *, @@ -1265,6 +1294,39 @@ def combine_first(self, other) -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def update( + self, other, join: str = "left", overwrite: bool = True, filter_func=None + ) -> DataFrame: + """ + Modify in place using non-NA values from another DataFrame. + + Aligns on indices. There is no return value. + + Args: + other (DataFrame, or object coercible into a DataFrame): + Should have at least one matching index/column label + with the original DataFrame. If a Series is passed, + its name attribute must be set, and that will be + used as the column name to align with the original DataFrame. + join ({'left'}, default 'left'): + Only left join is implemented, keeping the index and columns of the + original object. + overwrite (bool, default True): + How to handle non-NA values for overlapping keys: + True: overwrite original DataFrame's values + with values from `other`. + False: only update values that are NA in + the original DataFrame. + + filter_func (callable(1d-array) -> bool 1d-array, optional): + Can choose to replace values other than NA. Return True for values + that should be updated. + + Returns: + None: This method directly changes calling object. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + # ---------------------------------------------------------------------- # Data reshaping From c3fabf55e33d6b472f0100fa0fed7a875fd363ab Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 28 Sep 2023 01:24:02 +0000 Subject: [PATCH 2/3] pr comments --- bigframes/clients.py | 2 +- bigframes/core/block_transforms.py | 113 +++++++++++++++-------------- 2 files changed, 61 insertions(+), 54 deletions(-) diff --git a/bigframes/clients.py b/bigframes/clients.py index 10deb7ec0a..b60fcba04a 100644 --- a/bigframes/clients.py +++ b/bigframes/clients.py @@ -83,7 +83,7 @@ def create_bq_connection( service_account_id = cast(str, service_account_id) # Ensure IAM role on the BQ connection # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function - self._ensure_iam_binding(project_id, service_account_id, iam_role) # type: ignore + self._ensure_iam_binding(project_id, service_account_id, iam_role) # Introduce retries to accommodate transient errors like etag mismatch, # which can be caused by concurrent operation on the same resource, and diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 6126a566b4..ee773c53b0 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -509,59 +509,66 @@ def align( axis: typing.Union[str, int, None] = None, ) -> typing.Tuple[blocks.Block, blocks.Block]: axis_n = core.utils.get_axis_number(axis) if axis is not None else None - if (axis_n == 0) or (axis_n is None): - # Align rows - joined_index, (get_column_left, get_column_right) = left_block.index.join( - right_block.index, how=join - ) - left_block_merged = joined_index._block - right_block_merged = joined_index._block - # Syncing the blocks ensures that the resulting blocks can utilize row identity join - sync_blocks = True - else: - left_block_merged = left_block - right_block_merged = right_block - get_column_left = get_column_right = lambda x: x - sync_blocks = False - - if (axis_n == 1) or (axis_n is None): - # align columns - columns, lcol_indexer, rcol_indexer = left_block.column_labels.join( - right_block.column_labels, how=join, return_indexers=True - ) - column_indices = zip( - lcol_indexer if (lcol_indexer is not None) else range(len(columns)), - rcol_indexer if (rcol_indexer is not None) else range(len(columns)), - ) - left_column_ids = [] - right_column_ids = [] + # Must align columns first as other way will likely create extra joins + if (axis_n is None) or axis_n == 1: + left_block, right_block = align_columns(left_block, right_block, join=join) + if (axis_n is None) or axis_n == 0: + left_block, right_block = align_rows(left_block, right_block, join=join) + return left_block, right_block - for left_index, right_index in column_indices: - if left_index >= 0: - left_col_id = get_column_left(left_block.value_columns[left_index]) - else: - dtype = right_block.dtypes[right_index] - left_block_merged, left_col_id = left_block_merged.create_constant( - None, dtype=dtype, label=right_block.column_labels[right_index] - ) - if sync_blocks: - right_block_merged = left_block_merged - left_column_ids.append(left_col_id) - if right_index >= 0: - right_col_id = get_column_right(right_block.value_columns[right_index]) - else: - dtype = left_block.dtypes[left_index] - right_block_merged, right_col_id = right_block_merged.create_constant( - None, dtype=dtype, label=left_block.column_labels[left_index] - ) - if sync_blocks: - left_block_merged = right_block_merged - right_column_ids.append(right_col_id) - else: - left_column_ids = [get_column_left(col) for col in left_block.value_columns] - right_column_ids = [get_column_right(col) for col in right_block.value_columns] - - left_final = left_block_merged.select_columns(left_column_ids) - right_final = right_block_merged.select_columns(right_column_ids) +def align_rows( + left_block: blocks.Block, + right_block: blocks.Block, + join: str = "outer", +): + joined_index, (get_column_left, get_column_right) = left_block.index.join( + right_block.index, how=join + ) + left_columns = [get_column_left(col) for col in left_block.value_columns] + right_columns = [get_column_right(col) for col in right_block.value_columns] + + left_block = joined_index._block.select_columns(left_columns) + right_block = joined_index._block.select_columns(right_columns) + return left_block, right_block + + +def align_columns( + left_block: blocks.Block, + right_block: blocks.Block, + join: str = "outer", +): + columns, lcol_indexer, rcol_indexer = left_block.column_labels.join( + right_block.column_labels, how=join, return_indexers=True + ) + column_indices = zip( + lcol_indexer if (lcol_indexer is not None) else range(len(columns)), + rcol_indexer if (rcol_indexer is not None) else range(len(columns)), + ) + left_column_ids = [] + right_column_ids = [] + + original_left_block = left_block + original_right_block = right_block + + for left_index, right_index in column_indices: + if left_index >= 0: + left_col_id = original_left_block.value_columns[left_index] + else: + dtype = right_block.dtypes[right_index] + left_block, left_col_id = left_block.create_constant( + None, dtype=dtype, label=original_right_block.column_labels[right_index] + ) + left_column_ids.append(left_col_id) + + if right_index >= 0: + right_col_id = original_right_block.value_columns[right_index] + else: + dtype = original_left_block.dtypes[left_index] + right_block, right_col_id = right_block.create_constant( + None, dtype=dtype, label=left_block.column_labels[left_index] + ) + right_column_ids.append(right_col_id) + left_final = left_block.select_columns(left_column_ids) + right_final = right_block.select_columns(right_column_ids) return left_final, right_final From 26018e41d17210c0386277a252dc2750dced08ba Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 28 Sep 2023 22:04:42 +0000 Subject: [PATCH 3/3] issues with update test in pd 1.x --- tests/system/small/test_dataframe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index d562b4dc7c..ba76c4b0d3 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1225,6 +1225,8 @@ def test_combine( ], ) def test_df_update(overwrite, filter_func): + if pd.__version__.startswith("1."): + pytest.skip("dtype handled differently in pandas 1.x.") index1 = pandas.Index([1, 2, 3, 4], dtype="Int64") index2 = pandas.Index([1, 2, 4, 5], dtype="Int64") pd_df1 = pandas.DataFrame(