From b479be2b0351d008578b0f1c254b758383576d73 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 23 Apr 2025 01:25:49 +0000 Subject: [PATCH 1/4] docs: add sample code snippets for `udf` --- samples/snippets/conftest.py | 11 +++ samples/snippets/remote_function.py | 4 +- samples/snippets/udf.py | 121 ++++++++++++++++++++++++++++ samples/snippets/udf_test.py | 64 +++++++++++++++ 4 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 samples/snippets/udf.py create mode 100644 samples/snippets/udf_test.py diff --git a/samples/snippets/conftest.py b/samples/snippets/conftest.py index 5cba045ce4..e8253bc5a7 100644 --- a/samples/snippets/conftest.py +++ b/samples/snippets/conftest.py @@ -24,6 +24,8 @@ "python-bigquery-dataframes", "samples/snippets" ) +routine_prefixer = test_utils.prefixer.Prefixer("bigframes", "") + @pytest.fixture(scope="session", autouse=True) def cleanup_datasets(bigquery_client: bigquery.Client) -> None: @@ -106,3 +108,12 @@ def random_model_id_eu( full_model_id = f"{project_id}.{dataset_id_eu}.{random_model_id_eu}" yield full_model_id bigquery_client.delete_model(full_model_id, not_found_ok=True) + + +@pytest.fixture +def routine_id() -> Iterator[str]: + """Create a new BQ routine ID each time, so random_routine_id can be used as + target for udf creation. + """ + random_routine_id = routine_prefixer.create_prefix() + yield random_routine_id diff --git a/samples/snippets/remote_function.py b/samples/snippets/remote_function.py index 3a7031ef89..4c5b365007 100644 --- a/samples/snippets/remote_function.py +++ b/samples/snippets/remote_function.py @@ -21,7 +21,7 @@ def run_remote_function_and_read_gbq_function(project_id: str) -> None: # Set BigQuery DataFrames options bpd.options.bigquery.project = your_gcp_project_id - bpd.options.bigquery.location = "us" + bpd.options.bigquery.location = "US" # BigQuery DataFrames gives you the ability to turn your custom scalar # functions into a BigQuery remote function. It requires the GCP project to @@ -56,7 +56,7 @@ def get_bucket(num: float) -> str: boundary = 4000 return "at_or_above_4000" if num >= boundary else "below_4000" - # Then we can apply the remote function on the `Series`` of interest via + # Then we can apply the remote function on the `Series` of interest via # `apply` API and store the result in a new column in the DataFrame. df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket)) diff --git a/samples/snippets/udf.py b/samples/snippets/udf.py new file mode 100644 index 0000000000..495cd33e84 --- /dev/null +++ b/samples/snippets/udf.py @@ -0,0 +1,121 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def run_udf_and_read_gbq_function( + project_id: str, dataset_id: str, routine_id: str +) -> None: + your_gcp_project_id = project_id + your_bq_dataset_id = dataset_id + your_bq_routine_id = routine_id + + # [START bigquery_dataframes_udf] + import bigframes.pandas as bpd + + # Set BigQuery DataFrames options + bpd.options.bigquery.project = your_gcp_project_id + bpd.options.bigquery.location = "US" + + # BigQuery DataFrames gives you the ability to turn your custom functions + # into a BigQuery Python UDF. One can find more details about the usage and + # the requirements via `help` command. + help(bpd.udf) + + # Read a table and inspect the column of interest. + df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") + df["body_mass_g"].peek(10) + + # Define a custom function, and specify the intent to turn it into a + # BigQuery Python UDF. Let's try a `pandas`-like use case in which we want + # to apply a user defined function to every value in a `Series`, more + # specifically bucketize the `body_mass_g` value of the penguins, which is a + # real number, into a category, which is a string. + @bpd.udf( + dataset=your_bq_dataset_id, + name=your_bq_routine_id, + ) + def get_bucket(num: float) -> str: + if not num: + return "NA" + boundary = 4000 + return "at_or_above_4000" if num >= boundary else "below_4000" + + # Then we can apply the udf on the `Series` of interest via + # `apply` API and store the result in a new column in the DataFrame. + df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket)) + + # This will add a new column `body_mass_bucket` in the DataFrame. You can + # preview the original value and the bucketized value side by side. + df[["body_mass_g", "body_mass_bucket"]].peek(10) + + # The above operation was possible by doing all the computation on the + # cloud through an underlying BigQuery Python UDF that was created to + # support the user's operations in the Python code. + + # The BigQuery Python UDF created to support the BigQuery DataFrames + # udf can be located via a property `bigframes_bigquery_function` + # set in the udf object. + print(f"Created BQ Python UDF: {get_bucket.bigframes_bigquery_function}") + + # If you have already defined a custom function in BigQuery, either via the + # BigQuery Google Cloud Console or with the `udf` decorator, + # or otherwise, you may use it with BigQuery DataFrames with the + # `read_gbq_function` method. More details are available via the `help` + # command. + help(bpd.read_gbq_function) + + existing_get_bucket_bq_udf = get_bucket.bigframes_bigquery_function + + # Here is an example of using `read_gbq_function` to load an existing + # BigQuery Python UDF. + df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") + get_bucket_function = bpd.read_gbq_function(existing_get_bucket_bq_udf) + + df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket_function)) + df.peek(10) + + # Let's continue trying other potential use cases of udf. Let's say we + # consider the `species`, `island` and `sex` of the penguins sensitive + # information and want to redact that by replacing with their hash code + # instead. Let's define another scalar custom function and decorate it + # as a udf. The custom function in this example has external package + # dependency, which can be specified via `packages` parameter. + @bpd.udf( + dataset=your_bq_dataset_id, + name=your_bq_routine_id, + packages=["cryptography"], + ) + def get_hash(input: str) -> str: + from cryptography.fernet import Fernet + + # handle missing value + if input is None: + input = "" + + key = Fernet.generate_key() + f = Fernet(key) + return f.encrypt(input.encode()).decode() + + # We can use this udf in another `pandas`-like API `map` that + # can be applied on a DataFrame + df_redacted = df[["species", "island", "sex"]].map(get_hash) + df_redacted.peek(10) + + # [END bigquery_dataframes_udf] + + # Clean up cloud artifacts + session = bpd.get_global_session() + session.bqclient.delete_routine( + f"{your_bq_dataset_id}.{your_bq_routine_id}", not_found_ok=True + ) diff --git a/samples/snippets/udf_test.py b/samples/snippets/udf_test.py new file mode 100644 index 0000000000..c660b155c2 --- /dev/null +++ b/samples/snippets/udf_test.py @@ -0,0 +1,64 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google.api_core.exceptions +import google.cloud.bigquery_connection_v1 +import pytest + +import bigframes.pandas + +from . import udf + + +# TODO(tswast): Once the connections are cleaned up in the sample test project +# and https://github.com/GoogleCloudPlatform/python-docs-samples/issues/11720 +# is closed, we shouldn't need this because AFAIK we only use one BQ connection +# in this sample. +@pytest.fixture(autouse=True) +def cleanup_connections() -> None: + client = google.cloud.bigquery_connection_v1.ConnectionServiceClient() + + for conn in client.list_connections( + parent="projects/python-docs-samples-tests/locations/us" + ): + try: + int(conn.name.split("/")[-1].split("-")[0], base=16) + except ValueError: + print(f"Couldn't parse {conn.name}") + continue + + try: + print(f"removing {conn.name}") + client.delete_connection( + google.cloud.bigquery_connection_v1.DeleteConnectionRequest( + {"name": conn.name}, + ) + ) + except google.api_core.exceptions.GoogleAPIError: + # We did as much clean up as we can. + break + + +def test_udf_and_read_gbq_function( + capsys: pytest.CaptureFixture[str], + project_id: str, + dataset_id: str, + routine_id: str, +) -> None: + # We need a fresh session since we're modifying connection options. + bigframes.pandas.close_session() + + udf.run_udf_and_read_gbq_function(project_id, dataset_id, routine_id) + out, _ = capsys.readouterr() + assert "Created BQ Python UDF:" in out From ea8f721221c4447683549655cd0129529f2be4cc Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 23 Apr 2025 01:29:07 +0000 Subject: [PATCH 2/4] remove connection cleanup, not neede for udf --- samples/snippets/udf_test.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/samples/snippets/udf_test.py b/samples/snippets/udf_test.py index c660b155c2..587f2b387e 100644 --- a/samples/snippets/udf_test.py +++ b/samples/snippets/udf_test.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import google.api_core.exceptions -import google.cloud.bigquery_connection_v1 import pytest import bigframes.pandas @@ -21,35 +19,6 @@ from . import udf -# TODO(tswast): Once the connections are cleaned up in the sample test project -# and https://github.com/GoogleCloudPlatform/python-docs-samples/issues/11720 -# is closed, we shouldn't need this because AFAIK we only use one BQ connection -# in this sample. -@pytest.fixture(autouse=True) -def cleanup_connections() -> None: - client = google.cloud.bigquery_connection_v1.ConnectionServiceClient() - - for conn in client.list_connections( - parent="projects/python-docs-samples-tests/locations/us" - ): - try: - int(conn.name.split("/")[-1].split("-")[0], base=16) - except ValueError: - print(f"Couldn't parse {conn.name}") - continue - - try: - print(f"removing {conn.name}") - client.delete_connection( - google.cloud.bigquery_connection_v1.DeleteConnectionRequest( - {"name": conn.name}, - ) - ) - except google.api_core.exceptions.GoogleAPIError: - # We did as much clean up as we can. - break - - def test_udf_and_read_gbq_function( capsys: pytest.CaptureFixture[str], project_id: str, From eb5ae56428785883fec224f54db2b173a2a39de9 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 23 Apr 2025 22:16:59 +0000 Subject: [PATCH 3/4] use bigframes project for doctest --- samples/snippets/udf_test.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/samples/snippets/udf_test.py b/samples/snippets/udf_test.py index 587f2b387e..a352b4c8ce 100644 --- a/samples/snippets/udf_test.py +++ b/samples/snippets/udf_test.py @@ -21,13 +21,18 @@ def test_udf_and_read_gbq_function( capsys: pytest.CaptureFixture[str], - project_id: str, dataset_id: str, routine_id: str, ) -> None: # We need a fresh session since we're modifying connection options. bigframes.pandas.close_session() - udf.run_udf_and_read_gbq_function(project_id, dataset_id, routine_id) + # Determine project id, in this case prefer the one set in the environment + # variable GOOGLE_CLOUD_PROJECT (if any) + import os + + your_project_id = os.getenv("GOOGLE_CLOUD_PROJECT", "bigframes-dev") + + udf.run_udf_and_read_gbq_function(your_project_id, dataset_id, routine_id) out, _ = capsys.readouterr() assert "Created BQ Python UDF:" in out From d63dc61d6e4f0a0759b204439f93c0ddbdd7945a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 24 Apr 2025 19:24:33 +0000 Subject: [PATCH 4/4] restore python version agnostic logic for udf --- bigframes/functions/_function_session.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 7fb5cc114b..e18f7084db 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -838,9 +838,18 @@ def wrapper(func): TypeError, f"func must be a callable, got {func}" ) - # Managed function supports version >= 3.11. - signature_kwargs: Mapping[str, Any] = {"eval_str": True} - signature = inspect.signature(func, **signature_kwargs) + if sys.version_info >= (3, 10): + # Add `eval_str = True` so that deferred annotations are turned into their + # corresponding type objects. Need Python 3.10 for eval_str parameter. + # https://docs.python.org/3/library/inspect.html#inspect.signature + signature_kwargs: Mapping[str, Any] = {"eval_str": True} + else: + signature_kwargs = {} # type: ignore + + signature = inspect.signature( + func, + **signature_kwargs, + ) # Try to get input types via type annotations. if input_types is None: