From 91f83b33d3b5e19989903eadf0a47da5c97aef31 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 10 Mar 2025 20:46:50 +0000 Subject: [PATCH 1/8] feat: support read_gbq_function for managed function --- bigframes/dataframe.py | 50 ++++++------- bigframes/functions/_function_session.py | 2 - bigframes/functions/function.py | 2 +- bigframes/series.py | 45 ++++++------ .../large/functions/test_managed_function.py | 36 +++++++++- .../large/functions/test_remote_function.py | 48 ++++++------- .../small/functions/test_managed_function.py | 71 ++++++++++++++++++- .../small/functions/test_remote_function.py | 28 ++++---- 8 files changed, 186 insertions(+), 96 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2349e469ab..288a1c8a23 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4108,7 +4108,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: ) def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): - # In Bigframes remote function, DataFrame '.apply' method is specifically + # In Bigframes BigQuery function, DataFrame '.apply' method is specifically # designed to work with row-wise or column-wise operations, where the input # to the applied function should be a Series, not a scalar. @@ -4116,24 +4116,18 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): msg = bfe.format_message("axis=1 scenario is in preview.") warnings.warn(msg, category=bfe.PreviewWarning) - # TODO(jialuo): Deprecate the "bigframes_remote_function" attribute. - # We have some tests using pre-defined remote_function that were - # defined based on "bigframes_remote_function" instead of - # "bigframes_bigquery_function". So we need to fix those pre-defined - # remote functions before deprecating the "bigframes_remote_function" - # attribute. Check if the function is a remote function. - if not hasattr(func, "bigframes_remote_function") and not hasattr( - func, "bigframes_bigquery_function" - ): - raise ValueError("For axis=1 a bigframes function must be used.") + if not hasattr(func, "bigframes_bigquery_function"): + raise ValueError( + "For axis=1 a BigFrames BigQuery function must be used." + ) is_row_processor = getattr(func, "is_row_processor") if is_row_processor: # Early check whether the dataframe dtypes are currently supported - # in the remote function + # in the bigquery function # NOTE: Keep in sync with the value converters used in the gcf code # generated in function_template.py - remote_function_supported_dtypes = ( + bigquery_function_supported_dtypes = ( bigframes.dtypes.INT_DTYPE, bigframes.dtypes.FLOAT_DTYPE, bigframes.dtypes.BOOL_DTYPE, @@ -4142,18 +4136,18 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) supported_dtypes_types = tuple( type(dtype) - for dtype in remote_function_supported_dtypes + for dtype in bigquery_function_supported_dtypes if not isinstance(dtype, pandas.ArrowDtype) ) # Check ArrowDtype separately since multiple BigQuery types map to # ArrowDtype, including BYTES and TIMESTAMP. supported_arrow_types = tuple( dtype.pyarrow_dtype - for dtype in remote_function_supported_dtypes + for dtype in bigquery_function_supported_dtypes if isinstance(dtype, pandas.ArrowDtype) ) supported_dtypes_hints = tuple( - str(dtype) for dtype in remote_function_supported_dtypes + str(dtype) for dtype in bigquery_function_supported_dtypes ) for dtype in self.dtypes: @@ -4186,10 +4180,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) else: # This is a special case where we are providing not-pandas-like - # extension. If the remote function can take one or more params - # then we assume that here the user intention is to use the - # column values of the dataframe as arguments to the function. - # For this to work the following condition must be true: + # extension. If the bigquery function can take one or more + # params then we assume that here the user intention is to use + # the column values of the dataframe as arguments to the + # function. For this to work the following condition must be + # true: # 1. The number or input params in the function must be same # as the number of columns in the dataframe # 2. The dtypes of the columns in the dataframe must be @@ -4231,14 +4226,15 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): return result_series - # At this point column-wise or element-wise remote function operation will - # be performed (not supported). - if hasattr(func, "bigframes_remote_function"): + # At this point column-wise or element-wise bigquery function operation + # will be performed (not supported). + if hasattr(func, "bigframes_bigquery_function"): raise NotImplementedError( - "BigFrames DataFrame '.apply()' does not support remote function " - "for column-wise (i.e. with axis=0) operations, please use a " - "regular python function instead. For element-wise operations of " - "the remote function, please use '.map()'." + "BigFrames DataFrame '.apply()' does not support BigFrames " + "BigQuery function for column-wise (i.e. with axis=0) " + "operations, please use a regular python function instead. For " + "element-wise operations of the BigFrames BigQuery function, " + "please use '.map()'." ) # Per-column apply diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 0ae674b97d..fdd503485a 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -613,7 +613,6 @@ def wrapper(func): func = cloudpickle.loads(cloudpickle.dumps(func)) self._try_delattr(func, "bigframes_cloud_function") - self._try_delattr(func, "bigframes_remote_function") self._try_delattr(func, "bigframes_bigquery_function") self._try_delattr(func, "bigframes_bigquery_function_output_dtype") self._try_delattr(func, "input_dtypes") @@ -692,7 +691,6 @@ def wrapper(func): rf_name ) ) - func.bigframes_remote_function = func.bigframes_bigquery_function func.input_dtypes = tuple( [ bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 16416eb864..c7f402b24d 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -214,7 +214,7 @@ def func(*bigframes_args, **bigframes_kwargs): database=routine_ref.dataset_id, signature=(ibis_signature.input_types, ibis_signature.output_type), ) # type: ignore - func.bigframes_remote_function = str(routine_ref) # type: ignore + func.bigframes_bigquery_function = str(routine_ref) # type: ignore # set input bigframes data types has_unknown_dtypes = False diff --git a/bigframes/series.py b/bigframes/series.py index 2c37913679..34ac3c3de9 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -68,9 +68,9 @@ LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] -_remote_function_recommendation_message = ( +_bigquery_function_recommendation_message = ( "Your functions could not be applied directly to the Series." - " Try converting it to a remote function." + " Try converting it to a BigFrames BigQuery function." ) _list = list # Type alias to escape Series.list property @@ -1530,25 +1530,20 @@ def apply( if not callable(func): raise ValueError( - "Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported." + "Only a ufunc (a function that applies to the entire Series) or" + " a BigFrames BigQuery function that only works on single values" + " are supported." ) - # TODO(jialuo): Deprecate the "bigframes_remote_function" attribute. - # We have some tests using pre-defined remote_function that were defined - # based on "bigframes_remote_function" instead of - # "bigframes_bigquery_function". So we need to fix those pre-defined - # remote functions before deprecating the "bigframes_remote_function" - # attribute. - if not hasattr(func, "bigframes_remote_function") and not hasattr( - func, "bigframes_bigquery_function" - ): + if not hasattr(func, "bigframes_bigquery_function"): # It is neither a remote function nor a managed function. # Then it must be a vectorized function that applies to the Series # as a whole. if by_row: raise ValueError( - "A vectorized non-remote function can be provided only with by_row=False." - " For element-wise operation it must be a remote function." + "A vectorized non-BigFrames BigQuery function can be " + "provided only with by_row=False. For element-wise operation " + "it must be a BigFrames BigQuery function." ) try: @@ -1556,12 +1551,12 @@ def apply( except Exception as ex: # This could happen if any of the operators in func is not # supported on a Series. Let's guide the customer to use a - # remote function instead + # bigquery function instead if hasattr(ex, "message"): - ex.message += f"\n{_remote_function_recommendation_message}" + ex.message += f"\n{_bigquery_function_recommendation_message}" raise - # We are working with remote function at this point + # We are working with bigquery function at this point result_series = self._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) ) @@ -1590,21 +1585,21 @@ def combine( ) -> Series: if not callable(func): raise ValueError( - "Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported." + "Only a ufunc (a function that applies to the entire Series) or" + " a BigFrames BigQuery function that only works on single values" + " are supported." ) - if not hasattr(func, "bigframes_remote_function") and not hasattr( - func, "bigframes_bigquery_function" - ): + if not hasattr(func, "bigframes_bigquery_function"): # Keep this in sync with .apply try: return func(self, other) except Exception as ex: # This could happen if any of the operators in func is not # supported on a Series. Let's guide the customer to use a - # remote function instead + # bigquery function instead if hasattr(ex, "message"): - ex.message += f"\n{_remote_function_recommendation_message}" + ex.message += f"\n{_bigquery_function_recommendation_message}" raise result_series = self._apply_binary_op( @@ -1749,10 +1744,10 @@ def duplicated(self, keep: str = "first") -> Series: def mask(self, cond, other=None) -> Series: if callable(cond): - if hasattr(cond, "bigframes_remote_function"): + if hasattr(cond, "bigframes_bigquery_function"): cond = self.apply(cond) else: - # For non-remote function assume that it is applicable on Series + # For non-BigQuery function assume that it is applicable on Series cond = self.apply(cond, by_row=False) if not isinstance(cond, Series): diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 503720edcc..e918db3a9d 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -161,6 +161,18 @@ def func(x, y): .to_pandas() ) pandas.testing.assert_series_equal(bf_result, pd_result) + + # Make sure the read_gbq_function path works for this function. + managed_func_ref = session.read_gbq_function( + managed_func.bigframes_bigquery_function + ) + bf_result_gbq = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], managed_func_ref) + .to_pandas() + ) + pandas.testing.assert_series_equal(bf_result_gbq, pd_result) + finally: # clean up the gcp assets created for the managed function. cleanup_function_assets( @@ -199,6 +211,16 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore # Ignore any dtype disparity. pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # Make sure the read_gbq_function path works for this function. + featurize_ref = session.read_gbq_function(featurize.bigframes_bigquery_function) + + # Test on the function from read_gbq_function. + got = featurize_ref(10) + assert got == [array_dtype(i) for i in [10, 11, 12]] + + bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas() + pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) + finally: # Clean up the gcp assets created for the managed function. cleanup_function_assets( @@ -246,7 +268,7 @@ def func(x, y): get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", ) -def test_manage_function_df_apply_axis_1_array_output(session): +def test_managed_function_df_apply_axis_1_array_output(session): bf_df = bigframes.dataframe.DataFrame( { "Id": [1, 2, 3], @@ -321,6 +343,18 @@ def foo(x, y, z): expected_result, bf_result, check_dtype=False, check_index_type=False ) + # Make sure the read_gbq_function path works for this function. + foo_ref = session.read_gbq_function(foo.bigframes_bigquery_function) + + # Test on the function from read_gbq_function. + got = foo_ref(10, 38, "hello") + assert got == ["10", "38.0", "hello"] + + bf_result_gbq = bf_df.apply(foo_ref, axis=1).to_pandas() + pandas.testing.assert_series_equal( + bf_result_gbq, expected_result, check_dtype=False, check_index_type=False + ) + finally: # Clean up the gcp assets created for the managed function. cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 65bf20b966..03f2d0ff99 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -886,7 +886,7 @@ def square(x): )(square) # The remote function should reflect the explicitly provided name - assert square_remote.bigframes_remote_function == expected_remote_function + assert square_remote.bigframes_bigquery_function == expected_remote_function # Now the expected BQ remote function should exist session.bqclient.get_routine(expected_remote_function) @@ -1017,7 +1017,7 @@ def test_internal(rf, udf): )(square_uniq) # The remote function should reflect the explicitly provided name - assert square_remote1.bigframes_remote_function == expected_remote_function + assert square_remote1.bigframes_bigquery_function == expected_remote_function # Now the expected BQ remote function should exist routine = session.bqclient.get_routine(expected_remote_function) @@ -1041,7 +1041,7 @@ def test_internal(rf, udf): )(square_uniq) # The new remote function should still reflect the explicitly provided name - assert square_remote2.bigframes_remote_function == expected_remote_function + assert square_remote2.bigframes_bigquery_function == expected_remote_function # The expected BQ remote function should still exist routine = session.bqclient.get_routine(expected_remote_function) @@ -1084,7 +1084,7 @@ def plusone(x): )(plusone_uniq) # The new remote function should still reflect the explicitly provided name - assert plusone_remote.bigframes_remote_function == expected_remote_function + assert plusone_remote.bigframes_bigquery_function == expected_remote_function # The expected BQ remote function should still exist routine = session.bqclient.get_routine(expected_remote_function) @@ -1239,7 +1239,7 @@ def square(x): return x * x assert ( - bigquery.Routine(square.bigframes_remote_function).dataset_id + bigquery.Routine(square.bigframes_bigquery_function).dataset_id == session._anonymous_dataset.dataset_id ) @@ -1476,7 +1476,7 @@ def square(x): )(square) bq_routine = session.bqclient.get_routine( - square_remote.bigframes_remote_function + square_remote.bigframes_bigquery_function ) assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows @@ -1623,7 +1623,7 @@ def serialize_row(row): # Let's make sure the read_gbq_function path works for this function serialize_row_reuse = session.read_gbq_function( - serialize_row_remote.bigframes_remote_function, is_row_processor=True + serialize_row_remote.bigframes_bigquery_function, is_row_processor=True ) bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @@ -1929,8 +1929,8 @@ def foo(x: int) -> int: return x + 1 # ensure that remote function artifacts are created - assert foo.bigframes_remote_function is not None - session.bqclient.get_routine(foo.bigframes_remote_function) is not None + assert foo.bigframes_bigquery_function is not None + session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None assert foo.bigframes_cloud_function is not None session.cloudfunctionsclient.get_function( name=foo.bigframes_cloud_function @@ -1941,7 +1941,7 @@ def foo(x: int) -> int: # ensure that the bq remote function is deleted with pytest.raises(google.cloud.exceptions.NotFound): - session.bqclient.get_routine(foo.bigframes_remote_function) + session.bqclient.get_routine(foo.bigframes_bigquery_function) # the deletion of cloud function happens in a non-blocking way, ensure that # it either exists in a being-deleted state, or is already deleted @@ -1969,8 +1969,8 @@ def foo(x: int) -> int: return x + 1 # ensure that remote function artifacts are created - assert foo.bigframes_remote_function is not None - session.bqclient.get_routine(foo.bigframes_remote_function) is not None + assert foo.bigframes_bigquery_function is not None + session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None assert foo.bigframes_cloud_function is not None session.cloudfunctionsclient.get_function( name=foo.bigframes_cloud_function @@ -1980,7 +1980,7 @@ def foo(x: int) -> int: session.close() # ensure that the bq remote function still exists - session.bqclient.get_routine(foo.bigframes_remote_function) is not None + session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None # the deletion of cloud function happens in a non-blocking way, ensure # that it was not deleted and still exists in active state @@ -2017,8 +2017,8 @@ def foo_named(x: int) -> int: # check that BQ remote functiosn were created with corresponding cloud # functions for foo in [foo_unnamed, foo_named]: - assert foo.bigframes_remote_function is not None - session.bqclient.get_routine(foo.bigframes_remote_function) is not None + assert foo.bigframes_bigquery_function is not None + session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None assert foo.bigframes_cloud_function is not None session.cloudfunctionsclient.get_function( name=foo.bigframes_cloud_function @@ -2032,7 +2032,7 @@ def foo_named(x: int) -> int: # ensure that the unnamed bq remote function is deleted along with its # corresponding cloud function with pytest.raises(google.cloud.exceptions.NotFound): - session.bqclient.get_routine(foo_unnamed.bigframes_remote_function) + session.bqclient.get_routine(foo_unnamed.bigframes_bigquery_function) try: gcf = session.cloudfunctionsclient.get_function( name=foo_unnamed.bigframes_cloud_function @@ -2043,7 +2043,7 @@ def foo_named(x: int) -> int: # ensure that the named bq remote function still exists along with its # corresponding cloud function - session.bqclient.get_routine(foo_named.bigframes_remote_function) is not None + session.bqclient.get_routine(foo_named.bigframes_bigquery_function) is not None gcf = session.cloudfunctionsclient.get_function( name=foo_named.bigframes_cloud_function ) @@ -2120,7 +2120,7 @@ def foo(x, y, z): ) # Let's make sure the read_gbq_function path works for this function - foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + foo_reuse = session.read_gbq_function(foo.bigframes_bigquery_function) bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() pandas.testing.assert_series_equal( expected_result, bf_result, check_dtype=False, check_index_type=False @@ -2206,7 +2206,7 @@ def foo(x, y, z): ) # Let's make sure the read_gbq_function path works for this function - foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + foo_reuse = session.read_gbq_function(foo.bigframes_bigquery_function) bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() pandas.testing.assert_series_equal( expected_result, bf_result, check_dtype=False, check_index_type=False @@ -2306,7 +2306,7 @@ def generate_stats(row: pandas.Series) -> list[int]: # Let's make sure the read_gbq_function path works for this function generate_stats_reuse = session.read_gbq_function( - generate_stats.bigframes_remote_function, + generate_stats.bigframes_bigquery_function, is_row_processor=True, ) bf_result = scalars_df[columns].apply(generate_stats_reuse, axis=1).to_pandas() @@ -2438,7 +2438,7 @@ def add_one(x: int) -> int: )(add_one) temporary_bigquery_remote_function = ( - add_one_remote_temp.bigframes_remote_function + add_one_remote_temp.bigframes_bigquery_function ) assert temporary_bigquery_remote_function is not None assert ( @@ -2515,7 +2515,7 @@ def add_one(x: int) -> int: )(add_one) persistent_bigquery_remote_function = ( - add_one_remote_persist.bigframes_remote_function + add_one_remote_persist.bigframes_bigquery_function ) assert persistent_bigquery_remote_function is not None assert ( @@ -2596,7 +2596,7 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore # Let's make sure the read_gbq_function path works for this function featurize_reuse = session.read_gbq_function( - featurize.bigframes_remote_function # type: ignore + featurize.bigframes_bigquery_function # type: ignore ) bf_result = scalars_df["int64_too"].apply(featurize_reuse).to_pandas() pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @@ -2634,7 +2634,7 @@ def featurize(x: float) -> list[float]: # type: ignore # Let's make sure the read_gbq_function path works for this function featurize_reuse = unordered_session.read_gbq_function( - featurize.bigframes_remote_function # type: ignore + featurize.bigframes_bigquery_function # type: ignore ) bf_int64_col = scalars_df["float64_col"].dropna() bf_result = bf_int64_col.apply(featurize_reuse).to_pandas() diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index e1af68512a..949cdf1d26 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -18,6 +18,7 @@ import bigframes.exceptions from bigframes.functions import _function_session as bff_session +from bigframes.functions import function as bff from bigframes.functions._utils import get_python_version from bigframes.pandas import udf import bigframes.pandas as bpd @@ -42,6 +43,7 @@ ], ) def test_managed_function_series_apply( + session, typ, scalars_dfs, dataset_id_permanent, @@ -78,6 +80,21 @@ def foo(x): assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + # Make sure the read_gbq_function path works for this function. + foo_ref = bff.read_gbq_function( + function_name=foo.bigframes_bigquery_function, # type: ignore + session=session, + ) + assert hasattr(foo_ref, "bigframes_bigquery_function") + assert foo.bigframes_bigquery_function == foo_ref.bigframes_bigquery_function # type: ignore + + bf_result_col_gbq = scalars_df["int64_too"].apply(foo_ref) + bf_result_gbq = ( + scalars_df["int64_too"].to_frame().assign(result=bf_result_col_gbq).to_pandas() + ) + + assert_pandas_df_equal(bf_result_gbq, pd_result, check_dtype=False) + @pytest.mark.skipif( get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, @@ -174,7 +191,9 @@ def foo_list(x): get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", ) -def test_managed_function_series_combine_list_output(dataset_id_permanent, scalars_dfs): +def test_managed_function_series_combine_list_output( + session, dataset_id_permanent, scalars_dfs +): def add_list(x: int, y: int) -> list[int]: return [x, y] @@ -208,6 +227,24 @@ def add_list(x: int, y: int) -> list[int]: # Ignore any dtype difference. pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # Make sure the read_gbq_function path works for this function. + add_list_managed_func_ref = bff.read_gbq_function( + function_name=add_list_managed_func.bigframes_bigquery_function, # type: ignore + session=session, + ) + + # Test on the function from read_gbq_function. + got = add_list_managed_func_ref(10, 38) + assert got == [10, 38] + + bf_result_gbq = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func_ref) + .to_pandas() + ) + + assert_pandas_df_equal(bf_result_gbq, pd_result, check_dtype=False) + @pytest.mark.skipif( get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, @@ -288,7 +325,9 @@ def add_ints(x, y): get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", ) -def test_managed_function_dataframe_map_list_output(scalars_dfs, dataset_id_permanent): +def test_managed_function_dataframe_map_list_output( + session, scalars_dfs, dataset_id_permanent +): def add_one_list(x): return [x + 1] * 3 @@ -313,6 +352,15 @@ def add_one_list(x): # Ignore any dtype difference. assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + # Make sure the read_gbq_function path works for this function. + mf_add_one_list_ref = bff.read_gbq_function( + function_name=mf_add_one_list.bigframes_bigquery_function, # type: ignore + session=session, + ) + + bf_result_gbq = bf_int64_df_filtered.map(mf_add_one_list_ref).to_pandas() + assert_pandas_df_equal(bf_result_gbq, pd_result, check_dtype=False) + @pytest.mark.skipif( get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, @@ -352,3 +400,22 @@ def add_ints_list(x, y): # Ignore any dtype difference. pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Make sure the read_gbq_function path works for this function. + add_ints_list_mf_ref = bff.read_gbq_function( + function_name=add_ints_list_mf.bigframes_bigquery_function, # type: ignore + session=session, + ) + assert hasattr(add_ints_list_mf_ref, "bigframes_bigquery_function") + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="axis=1 scenario is in preview.", + ): + bf_result_gbq = ( + bpd.DataFrame({"x": series, "y": series}) + .apply(add_ints_list_mf_ref, axis=1) + .to_pandas() + ) + + assert_pandas_df_equal(bf_result_gbq, pd_result, check_dtype=False) diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 075a57f23d..2e8c553297 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -123,7 +123,7 @@ def square(x): assert square(2) == 4 # Function should have extra metadata attached for remote execution. - assert hasattr(square, "bigframes_remote_function") + assert hasattr(square, "bigframes_bigquery_function") assert hasattr(square, "bigframes_cloud_function") assert hasattr(square, "ibis_node") @@ -665,21 +665,21 @@ def square1(x): assert square1(2) == 4 square2 = bff.read_gbq_function( - function_name=square1.bigframes_remote_function, # type: ignore + function_name=square1.bigframes_bigquery_function, # type: ignore session=session, ) # The newly-created function (square1) should have a remote function AND a # cloud function associated with it, while the read-back version (square2) # should only have a remote function. - assert square1.bigframes_remote_function # type: ignore + assert square1.bigframes_bigquery_function # type: ignore assert square1.bigframes_cloud_function # type: ignore - assert square2.bigframes_remote_function + assert square2.bigframes_bigquery_function assert not hasattr(square2, "bigframes_cloud_function") # They should point to the same function. - assert square1.bigframes_remote_function == square2.bigframes_remote_function # type: ignore + assert square1.bigframes_bigquery_function == square2.bigframes_bigquery_function # type: ignore # The result of applying them should be the same. int64_col = scalars_df_index["int64_col"] @@ -853,7 +853,7 @@ def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): ) # It should point to the named routine and yield the expected results. - assert square.bigframes_remote_function == str(routine.reference) + assert square.bigframes_bigquery_function == str(routine.reference) assert square.input_dtypes == (bigframes.dtypes.INT_DTYPE,) assert square.output_dtype == bigframes.dtypes.INT_DTYPE assert ( @@ -1087,10 +1087,10 @@ def test_df_apply_scalar_func(session, scalars_dfs): with pytest.raises(NotImplementedError) as context: bdf.apply(func_ref) assert str(context.value) == ( - "BigFrames DataFrame '.apply()' does not support remote function for " - "column-wise (i.e. with axis=0) operations, please use a regular python " - "function instead. For element-wise operations of the remote function, " - "please use '.map()'." + "BigFrames DataFrame '.apply()' does not support BigFrames BigQuery " + "function for column-wise (i.e. with axis=0) operations, please use a " + "regular python function instead. For element-wise operations of the " + "BigFrames BigQuery function, please use '.map()'." ) @@ -1133,7 +1133,7 @@ def add_ints(row): dataset_id_permanent, name=get_function_name(add_ints, is_row_processor=True), )(add_ints) - assert add_ints_remote.bigframes_remote_function # type: ignore + assert add_ints_remote.bigframes_bigquery_function # type: ignore assert add_ints_remote.bigframes_cloud_function # type: ignore with pytest.warns( @@ -1155,11 +1155,11 @@ def add_ints(row): # Read back the deployed BQ remote function using read_gbq_function. func_ref = session.read_gbq_function( - function_name=add_ints_remote.bigframes_remote_function, # type: ignore + function_name=add_ints_remote.bigframes_bigquery_function, # type: ignore is_row_processor=True, ) - assert func_ref.bigframes_remote_function == add_ints_remote.bigframes_remote_function # type: ignore + assert func_ref.bigframes_bigquery_function == add_ints_remote.bigframes_bigquery_function # type: ignore bf_result_gbq = scalars_df[columns].apply(func_ref, axis=1).to_pandas() pd.testing.assert_series_equal( @@ -1247,7 +1247,7 @@ def add_ints(row): scalars_pandas_df.apply(add_ints, axis=1) with pytest.raises( - ValueError, match="For axis=1 a bigframes function must be used." + ValueError, match="For axis=1 a BigFrames BigQuery function must be used." ): scalars_df[columns].apply(add_ints, axis=1) From 06c6c3f019fc299d8201f2eaf57204ed9bd9c132 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 10 Mar 2025 22:43:34 +0000 Subject: [PATCH 2/8] quick fix --- bigframes/functions/_function_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index fdd503485a..8c0a59bb91 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -722,7 +722,7 @@ def wrapper(func): # with that name and would directly manage their lifecycle. if created_new and (not name): self._update_temp_artifacts( - func.bigframes_remote_function, func.bigframes_cloud_function + func.bigframes_bigquery_function, func.bigframes_cloud_function ) return func From 7faf7a9a4ae6d150cadfa96e28e9da235dd57123 Mon Sep 17 00:00:00 2001 From: jialuo Date: Mon, 10 Mar 2025 23:41:26 +0000 Subject: [PATCH 3/8] add back attr in wrapper --- bigframes/functions/_function_session.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 8c0a59bb91..eb90c3d64d 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -613,6 +613,7 @@ def wrapper(func): func = cloudpickle.loads(cloudpickle.dumps(func)) self._try_delattr(func, "bigframes_cloud_function") + self._try_delattr(func, "bigframes_remote_function") self._try_delattr(func, "bigframes_bigquery_function") self._try_delattr(func, "bigframes_bigquery_function_output_dtype") self._try_delattr(func, "input_dtypes") @@ -691,6 +692,7 @@ def wrapper(func): rf_name ) ) + func.bigframes_remote_function = func.bigframes_bigquery_function func.input_dtypes = tuple( [ bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( From 854e458923d33154c754587fff500b73ee2169ca Mon Sep 17 00:00:00 2001 From: jialuo Date: Tue, 11 Mar 2025 01:13:40 +0000 Subject: [PATCH 4/8] fix tests --- tests/system/small/functions/test_managed_function.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index 949cdf1d26..947f8f9427 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -243,7 +243,7 @@ def add_list(x: int, y: int) -> list[int]: .to_pandas() ) - assert_pandas_df_equal(bf_result_gbq, pd_result, check_dtype=False) + pd.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) @pytest.mark.skipif( @@ -418,4 +418,4 @@ def add_ints_list(x, y): .to_pandas() ) - assert_pandas_df_equal(bf_result_gbq, pd_result, check_dtype=False) + pd.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) From 7f4fe529307aad4e8c93c6b299e2c6dc1fe09faa Mon Sep 17 00:00:00 2001 From: jialuo Date: Tue, 11 Mar 2025 23:47:34 +0000 Subject: [PATCH 5/8] add attr for remote function --- bigframes/functions/function.py | 3 +++ .../system/large/functions/test_managed_function.py | 11 +++++++++++ .../system/large/functions/test_remote_function.py | 10 ++++++++++ .../system/small/functions/test_managed_function.py | 13 +++++++++++++ .../system/small/functions/test_remote_function.py | 8 ++++++++ 5 files changed, 45 insertions(+) diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index c7f402b24d..8a709b87bf 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -215,6 +215,9 @@ def func(*bigframes_args, **bigframes_kwargs): signature=(ibis_signature.input_types, ibis_signature.output_type), ) # type: ignore func.bigframes_bigquery_function = str(routine_ref) # type: ignore + # We will keep the "bigframes_remote_function" attr for remote function. + if hasattr(routine, "remote_function_options") and routine.remote_function_options: + func.bigframes_remote_function = func.bigframes_bigquery_function # type: ignore # set input bigframes data types has_unknown_dtypes = False diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index da452ef6df..a540ec2a48 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -202,6 +202,13 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore # Make sure the read_gbq_function path works for this function. featurize_ref = session.read_gbq_function(featurize.bigframes_bigquery_function) + assert hasattr(featurize_ref, "bigframes_bigquery_function") + assert not hasattr(featurize_ref, "bigframes_remote_function") + assert ( + featurize_ref.bigframes_bigquery_function + == featurize.bigframes_bigquery_function + ) + # Test on the function from read_gbq_function. got = featurize_ref(10) assert got == [array_dtype(i) for i in [10, 11, 12]] @@ -334,6 +341,10 @@ def foo(x, y, z): # Make sure the read_gbq_function path works for this function. foo_ref = session.read_gbq_function(foo.bigframes_bigquery_function) + assert hasattr(foo_ref, "bigframes_bigquery_function") + assert not hasattr(foo_ref, "bigframes_remote_function") + assert foo_ref.bigframes_bigquery_function == foo.bigframes_bigquery_function + # Test on the function from read_gbq_function. got = foo_ref(10, 38, "hello") assert got == ["10", "38.0", "hello"] diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index ff26bd84bd..1e5e7ede26 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -881,6 +881,7 @@ def square(x): )(square) # The remote function should reflect the explicitly provided name + assert square_remote.bigframes_remote_function == expected_remote_function assert square_remote.bigframes_bigquery_function == expected_remote_function # Now the expected BQ remote function should exist @@ -1012,6 +1013,7 @@ def test_internal(rf, udf): )(square_uniq) # The remote function should reflect the explicitly provided name + assert square_remote1.bigframes_remote_function == expected_remote_function assert square_remote1.bigframes_bigquery_function == expected_remote_function # Now the expected BQ remote function should exist @@ -1036,6 +1038,7 @@ def test_internal(rf, udf): )(square_uniq) # The new remote function should still reflect the explicitly provided name + assert square_remote2.bigframes_remote_function == expected_remote_function assert square_remote2.bigframes_bigquery_function == expected_remote_function # The expected BQ remote function should still exist @@ -1079,6 +1082,7 @@ def plusone(x): )(plusone_uniq) # The new remote function should still reflect the explicitly provided name + assert plusone_remote.bigframes_remote_function == expected_remote_function assert plusone_remote.bigframes_bigquery_function == expected_remote_function # The expected BQ remote function should still exist @@ -1948,6 +1952,8 @@ def foo(x: int) -> int: return x + 1 # ensure that remote function artifacts are created + assert foo.bigframes_remote_function is not None + session.bqclient.get_routine(foo.bigframes_remote_function) is not None assert foo.bigframes_bigquery_function is not None session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None assert foo.bigframes_cloud_function is not None @@ -1988,6 +1994,8 @@ def foo(x: int) -> int: return x + 1 # ensure that remote function artifacts are created + assert foo.bigframes_remote_function is not None + session.bqclient.get_routine(foo.bigframes_remote_function) is not None assert foo.bigframes_bigquery_function is not None session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None assert foo.bigframes_cloud_function is not None @@ -2036,6 +2044,8 @@ def foo_named(x: int) -> int: # check that BQ remote functiosn were created with corresponding cloud # functions for foo in [foo_unnamed, foo_named]: + assert foo.bigframes_remote_function is not None + session.bqclient.get_routine(foo.bigframes_remote_function) is not None assert foo.bigframes_bigquery_function is not None session.bqclient.get_routine(foo.bigframes_bigquery_function) is not None assert foo.bigframes_cloud_function is not None diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index 947f8f9427..0b11080168 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -86,6 +86,7 @@ def foo(x): session=session, ) assert hasattr(foo_ref, "bigframes_bigquery_function") + assert not hasattr(foo_ref, "bigframes_remote_function") assert foo.bigframes_bigquery_function == foo_ref.bigframes_bigquery_function # type: ignore bf_result_col_gbq = scalars_df["int64_too"].apply(foo_ref) @@ -233,6 +234,13 @@ def add_list(x: int, y: int) -> list[int]: session=session, ) + assert hasattr(add_list_managed_func_ref, "bigframes_bigquery_function") + assert not hasattr(add_list_managed_func_ref, "bigframes_remote_function") + assert ( + add_list_managed_func_ref.bigframes_bigquery_function + == add_list_managed_func.bigframes_bigquery_function + ) + # Test on the function from read_gbq_function. got = add_list_managed_func_ref(10, 38) assert got == [10, 38] @@ -407,6 +415,11 @@ def add_ints_list(x, y): session=session, ) assert hasattr(add_ints_list_mf_ref, "bigframes_bigquery_function") + assert not hasattr(add_ints_list_mf_ref, "bigframes_remote_function") + assert ( + add_ints_list_mf_ref.bigframes_bigquery_function + == add_ints_list_mf.bigframes_bigquery_function + ) with pytest.warns( bigframes.exceptions.PreviewWarning, diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 2e8c553297..6de6d8c7ef 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -123,6 +123,7 @@ def square(x): assert square(2) == 4 # Function should have extra metadata attached for remote execution. + assert hasattr(square, "bigframes_remote_function") assert hasattr(square, "bigframes_bigquery_function") assert hasattr(square, "bigframes_cloud_function") assert hasattr(square, "ibis_node") @@ -672,14 +673,18 @@ def square1(x): # The newly-created function (square1) should have a remote function AND a # cloud function associated with it, while the read-back version (square2) # should only have a remote function. + assert square1.bigframes_remote_function # type: ignore assert square1.bigframes_bigquery_function # type: ignore assert square1.bigframes_cloud_function # type: ignore + assert square2.bigframes_remote_function assert square2.bigframes_bigquery_function assert not hasattr(square2, "bigframes_cloud_function") # They should point to the same function. + assert square1.bigframes_remote_function == square2.bigframes_remote_function # type: ignore assert square1.bigframes_bigquery_function == square2.bigframes_bigquery_function # type: ignore + assert square2.bigframes_remote_function == square2.bigframes_bigquery_function # type: ignore # The result of applying them should be the same. int64_col = scalars_df_index["int64_col"] @@ -1133,6 +1138,7 @@ def add_ints(row): dataset_id_permanent, name=get_function_name(add_ints, is_row_processor=True), )(add_ints) + assert add_ints_remote.bigframes_remote_function # type: ignore assert add_ints_remote.bigframes_bigquery_function # type: ignore assert add_ints_remote.bigframes_cloud_function # type: ignore @@ -1159,7 +1165,9 @@ def add_ints(row): is_row_processor=True, ) + assert func_ref.bigframes_remote_function == add_ints_remote.bigframes_remote_function # type: ignore assert func_ref.bigframes_bigquery_function == add_ints_remote.bigframes_bigquery_function # type: ignore + assert func_ref.bigframes_remote_function == func_ref.bigframes_bigquery_function # type: ignore bf_result_gbq = scalars_df[columns].apply(func_ref, axis=1).to_pandas() pd.testing.assert_series_equal( From b3a95fcfe66293f684f25f0cf51397009a10a427 Mon Sep 17 00:00:00 2001 From: jialuo Date: Wed, 12 Mar 2025 21:28:57 +0000 Subject: [PATCH 6/8] quick fix --- bigframes/functions/function.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 8a709b87bf..ddb54f6dbe 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -215,6 +215,7 @@ def func(*bigframes_args, **bigframes_kwargs): signature=(ibis_signature.input_types, ibis_signature.output_type), ) # type: ignore func.bigframes_bigquery_function = str(routine_ref) # type: ignore + # We will keep the "bigframes_remote_function" attr for remote function. if hasattr(routine, "remote_function_options") and routine.remote_function_options: func.bigframes_remote_function = func.bigframes_bigquery_function # type: ignore From 246eeaa2f0d355635c21316468115659f49c98df Mon Sep 17 00:00:00 2001 From: jialuo Date: Thu, 13 Mar 2025 18:27:28 +0000 Subject: [PATCH 7/8] fix rebase --- bigframes/dataframe.py | 13 +++++++------ .../system/small/functions/test_remote_function.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4e919af4b6..abab9fd268 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4226,15 +4226,16 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): return result_series - # At this point column-wise or element-wise remote function operation will + # At this point column-wise or element-wise bigquery function operation will # be performed (not supported). - if hasattr(func, "bigframes_remote_function"): + if hasattr(func, "bigframes_bigquery_function"): raise formatter.create_exception_with_feedback_link( NotImplementedError, - "BigFrames DataFrame '.apply()' does not support remote function " - "for column-wise (i.e. with axis=0) operations, please use a " - "regular python function instead. For element-wise operations of " - "the remote function, please use '.map()'.", + "BigFrames DataFrame '.apply()' does not support BigFrames " + "BigQuery function for column-wise (i.e. with axis=0) " + "operations, please use a regular python function instead. For " + "element-wise operations of the BigFrames BigQuery function, " + "please use '.map()'.", ) # Per-column apply diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 3e045f869c..0af7f4e42e 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -1093,7 +1093,7 @@ def test_df_apply_scalar_func(session, scalars_dfs): with pytest.raises(NotImplementedError) as context: bdf.apply(func_ref) assert str(context.value) == ( - "BigFrames DataFrame '.apply()' does not support BigFrames Bigquery " + "BigFrames DataFrame '.apply()' does not support BigFrames BigQuery " "function for column-wise (i.e. with axis=0) operations, please use a " "regular python function instead. For element-wise operations of the " "BigFrames BigQuery function, please use '.map()'. " From 969a4d6db9cd2f6186468aa6cc8f4c917b85aba9 Mon Sep 17 00:00:00 2001 From: jialuo Date: Fri, 14 Mar 2025 20:52:53 +0000 Subject: [PATCH 8/8] fix rebase --- tests/system/small/functions/test_managed_function.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index d1abc99855..54cf494f79 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -182,7 +182,9 @@ def foo_list(x): assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) -def test_managed_function_series_combine_list_output(dataset_id_permanent, scalars_dfs): +def test_managed_function_series_combine_list_output( + session, dataset_id_permanent, scalars_dfs +): def add_list(x: int, y: int) -> list[int]: return [x, y] @@ -309,7 +311,9 @@ def add_ints(x, y): ) -def test_managed_function_dataframe_map_list_output(scalars_dfs, dataset_id_permanent): +def test_managed_function_dataframe_map_list_output( + session, scalars_dfs, dataset_id_permanent +): def add_one_list(x): return [x + 1] * 3