8000 feat: support BQ managed functions through `read_gbq_function` by jialuoo · Pull Request #1476 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4108,32 +4108,26 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame:
)

def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
# In Bigframes remote function, DataFrame '.apply' method is specifically
# In Bigframes BigQuery function, DataFrame '.apply' method is specifically
# designed to work with row-wise or column-wise operations, where the input
# to the applied function should be a Series, not a scalar.

if utils.get_axis_number(axis) == 1:
msg = bfe.format_message("axis=1 scenario is in preview.")
warnings.warn(msg, category=bfe.PreviewWarning)

# TODO(jialuo): Deprecate the "bigframes_remote_function" attribute.
# We have some tests using pre-defined remote_function that were
# defined based on "bigframes_remote_function" instead of
# "bigframes_bigquery_function". So we need to fix those pre-defined
# remote functions before deprecating the "bigframes_remote_function"
# attribute. Check if the function is a remote function.
if not hasattr(func, "bigframes_remote_function") and not hasattr(
func, "bigframes_bigquery_function"
):
raise ValueError("For axis=1 a bigframes function must be used.")
if not hasattr(func, "bigframes_bigquery_function"):
raise ValueError(
"For axis=1 a BigFrames BigQuery function must be used."
)

is_row_processor = getattr(func, "is_row_processor")
if is_row_processor:
# Early check whether the dataframe dtypes are currently supported
# in the remote function
# in the bigquery function
# NOTE: Keep in sync with the value converters used in the gcf code
# generated in function_template.py
remote_function_supported_dtypes = (
bigquery_function_supported_dtypes = (
bigframes.dtypes.INT_DTYPE,
bigframes.dtypes.FLOAT_DTYPE,
bigframes.dtypes.BOOL_DTYPE,
Expand All @@ -4142,18 +4136,18 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)
supported_dtypes_types = tuple(
type(dtype)
for dtype in remote_function_supported_dtypes
for dtype in bigquery_function_supported_dtypes
if not isinstance(dtype, pandas.ArrowDtype)
)
# Check ArrowDtype separately since multiple BigQuery types map to
# ArrowDtype, including BYTES and TIMESTAMP.
supported_arrow_types = tuple(
dtype.pyarrow_dtype
for dtype in remote_function_supported_dtypes
for dtype in bigquery_function_supported_dtypes
if isinstance(dtype, pandas.ArrowDtype)
)
supported_dtypes_hints = tuple(
str(dtype) for dtype in remote_function_supported_dtypes
str(dtype) for dtype in bigquery_function_supported_dtypes
)

for dtype in self.dtypes:
Expand Down Expand Up @@ -4186,10 +4180,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)
else:
# This is a special case where we are providing not-pandas-like
# extension. If the remote function can take one or more params
# then we assume that here the user intention is to use the
# column values of the dataframe as arguments to the function.
# For this to work the following condition must be true:
# extension. If the bigquery function can take one or more
# params then we assume that here the user intention is to use
# the column values of the dataframe as arguments to the
# function. For this to work the following condition must be
# true:
# 1. The number or input params in the function must be same
# as the number of columns in the dataframe
# 2. The dtypes of the columns in the dataframe must be
Expand Down Expand Up @@ -4231,15 +4226,16 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):

return result_series

# At this point column-wise or element-wise remote function operation will
# At this point column-wise or element-wise bigquery function operation will
# be performed (not supported).
if hasattr(func, "bigframes_remote_function"):
if hasattr(func, "bigframes_bigquery_function"):
raise formatter.create_exception_with_feedback_link(
NotImplementedError,
"BigFrames DataFrame '.apply()' does not support remote function "
"for column-wise (i.e. with axis=0) operations, please use a "
"regular python function instead. For element-wise operations of "
"the remote function, please use '.map()'.",
"BigFrames DataFrame '.apply()' does not support BigFrames "
"BigQuery function for column-wise (i.e. with axis=0) "
"operations, please use a regular python function instead. For "
"element-wise operations of the BigFrames BigQuery function, "
"please use '.map()'.",
)

# Per-column apply
Expand Down
2 changes: 1 addition & 1 deletion bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def wrapper(func):
# with that name and would directly manage their lifecycle.
if created_new and (not name):
self._update_temp_artifacts(
func.bigframes_remote_function, func.bigframes_cloud_function
func.bigframes_bigquery_function, func.bigframes_cloud_function
)
return func

Expand Down
6 changes: 5 additions & 1 deletion bigframes/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ def func(*bigframes_args, **bigframes_kwargs):
database=routine_ref.dataset_id,
signature=(ibis_signature.input_types, ibis_signature.output_type),
) # type: ignore
func.bigframes_remote_function = str(routine_ref) # type: ignore
func.bigframes_bigquery_function = str(routine_ref) # type: ignore

# We will keep the "bigframes_remote_function" attr for remote function.
if hasattr(routine, "remote_function_options") and routine.remote_function_options:
func.bigframes_remote_function = func.bigframes_bigquery_function # type: ignore

# set input bigframes data types
has_unknown_dtypes = False
Expand Down
45 changes: 20 additions & 25 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


_remote_function_recommendation_message = (
_bigquery_function_recommendation_message = (
"Your functions could not be applied directly to the Series."
" Try converting it to a remote function."
" Try converting it to a BigFrames BigQuery function."
)

_list = list # Type alias to escape Series.list property
Expand Down Expand Up @@ -1530,38 +1530,33 @@ def apply(

if not callable(func):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported."
"Only a ufunc (a function that applies to the entire Series) or"
" a BigFrames BigQuery function that only works on single values"
" are supported."
)

# TODO(jialuo): Deprecate the "bigframes_remote_function" attribute.
# We have some tests using pre-defined remote_function that were defined
# based on "bigframes_remote_function" instead of
# "bigframes_bigquery_function". So we need to fix those pre-defined
# remote functions before deprecating the "bigframes_remote_function"
# attribute.
if not hasattr(func, "bigframes_remote_function") and not hasattr(
func, "bigframes_bigquery_function"
):
if not hasattr(func, "bigframes_bigquery_function"):
# It is neither a remote function nor a managed function.
# Then it must be a vectorized function that applies to the Series
# as a whole.
if by_row:
raise ValueError(
"A vectorized non-remote function can be provided only with by_row=False."
" For element-wise operation it must be a remote function."
"A vectorized non-BigFrames BigQuery function can be "
"provided only with by_row=False. For element-wise operation "
"it must be a BigFrames BigQuery function."
)

try:
return func(self)
except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# remote function instead
# bigquery function instead
if hasattr(ex, "message"):
ex.message += f"\n{_remote_function_recommendation_message}"
ex.message += f"\n{_bigquery_function_recommendation_message}"
raise

# We are working with remote function at this point
# We are working with bigquery function at this point
result_series = self._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
)
Expand Down Expand Up @@ -1590,21 +1585,21 @@ def combine(
) -> Series:
if not callable(func):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported."
"Only a ufunc (a function that applies to the entire Series) or"
" a BigFrames BigQuery function that only works on single values"
" are supported."
)

if not hasattr(func, "bigframes_remote_function") and not hasattr(
func, "bigframes_bigquery_function"
):
if not hasattr(func, "bigframes_bigquery_function"):
# Keep this in sync with .apply
try:
return func(self, other)
except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# remote function instead
# bigquery function instead
if hasattr(ex, "message"):
ex.message += f"\n{_remote_function_recommendation_message}"
ex.message += f"\n{_bigquery_function_recommendation_message}"
raise

result_series = self._apply_binary_op(
Expand Down Expand Up @@ -1749,10 +1744,10 @@ def duplicated(self, keep: str = "first") -> Series:

def mask(self, cond, other=None) -> Series:
if callable(cond):
if hasattr(cond, "bigframes_remote_function"):
if hasattr(cond, "bigframes_bigquery_function"):
cond = self.apply(cond)
else:
# For non-remote function assume that it is applicable on Series
# For non-BigQuery function assume that it is applicable on Series
cond = self.apply(cond, by_row=False)

if not isinstance(cond, Series):
Expand Down
45 changes: 45 additions & 0 deletions tests/system/large/functions/test_managed_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ def func(x, y):
.to_pandas()
)
pandas.testing.assert_series_equal(bf_result, pd_result)

# Make sure the read_gbq_function path works for this function.
managed_func_ref = session.read_gbq_function(
managed_func.bigframes_bigquery_function
)
bf_result_gbq = (
scalars_df["string_col"]
.combine(scalars_df["int64_col"], managed_func_ref)
.to_pandas()
)
pandas.testing.assert_series_equal(bf_result_gbq, pd_result)

finally:
# clean up the gcp assets created for the managed function.
cleanup_function_assets(
Expand Down Expand Up @@ -181,6 +193,23 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore
# Ignore any dtype disparity.
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)

# Make sure the read_gbq_function path works for this function.
featurize_ref = session.read_gbq_function(featurize.bigframes_bigquery_function)

assert hasattr(featurize_ref, "bigframes_bigquery_function")
assert not hasattr(featurize_ref, "bigframes_remote_function")
assert (
featurize_ref.bigframes_bigquery_function
== featurize.bigframes_bigquery_function
)

# Test on the function from read_gbq_function.
got = featurize_ref(10)
assert got == [array_dtype(i) for i in [10, 11, 12]]

bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas()
pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False)

finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(
Expand Down Expand Up @@ -295,6 +324,22 @@ def foo(x, y, z):
expected_result, bf_result, check_dtype=False, check_index_type=False
)

# Make sure the read_gbq_function path works for this function.
foo_ref = session.read_gbq_function(foo.bigframes_bigquery_function)

assert hasattr(foo_ref, "bigframes_bigquery_function")
assert not hasattr(foo_ref, "bigframes_remote_function")
assert foo_ref.bigframes_bigquery_function == foo.bigframes_bigquery_function

# Test on the function from read_gbq_function.
got = foo_ref(10, 38, "hello")
assert got == ["10", "38.0", "hello"]

bf_result_gbq = bf_df.apply(foo_ref, axis=1).to_pandas()
pandas.testing.assert_series_equal(
bf_result_gbq, expected_result, check_dtype=False, check_index_type=False
)

finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient)
Loading
0