From 50364d8fa81b90c7c0ba0ba2f816388154d34511 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 21 Sep 2024 09:17:25 +0000 Subject: [PATCH 1/4] feat: support ingress settings in `remote_function` --- .gitignore | 2 + .../functions/_remote_function_client.py | 22 ++++++ .../functions/_remote_function_session.py | 21 +++++- bigframes/session/__init__.py | 9 +++ tests/system/large/test_remote_function.py | 69 +++++++++++++++++++ 5 files changed, 122 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d083ea1ddc..ed62109f27 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,8 @@ docs.metadata # Virtual environment env/ venv/ +venv1/ +venv2/ # Test logs coverage.xml diff --git a/bigframes/functions/_remote_function_client.py b/bigframes/functions/_remote_function_client.py index 75385f11a5..e004ecc12f 100644 --- a/bigframes/functions/_remote_function_client.py +++ b/bigframes/functions/_remote_function_client.py @@ -23,6 +23,7 @@ import string import sys import tempfile +import types from typing import cast, Tuple, TYPE_CHECKING from bigframes_vendored import constants @@ -43,6 +44,15 @@ logger = logging.getLogger(__name__) +# https://cloud.google.com/sdk/gcloud/reference/functions/deploy#--ingress-settings +_INGRESS_SETTINGS_MAP = types.MappingProxyType( + { + "all": functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, + "internal-only": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, + "internal-and-gclb": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, + } +) + class RemoteFunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation @@ -228,6 +238,7 @@ def create_cloud_function( is_row_processor=False, vpc_connector=None, memory_mib=1024, + ingress_settings="all", ): """Create a cloud function from the given user defined function. @@ -324,6 +335,15 @@ def create_cloud_function( function.service_config.service_account_email = ( self._cloud_function_service_account ) + if ingress_settings not in _INGRESS_SETTINGS_MAP: + raise ValueError( + "'{}' not one of the supported ingress settings values: {}".format( + ingress_settings, list(_INGRESS_SETTINGS_MAP) + ) + ) + function.service_config.ingress_settings = _INGRESS_SETTINGS_MAP.get( + ingress_settings + ) function.kms_key_name = self._cloud_function_kms_key_name create_function_request.function = function @@ -372,6 +392,7 @@ def provision_bq_remote_function( is_row_processor, cloud_function_vpc_connector, cloud_function_memory_mib, + cloud_function_ingress_settings, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package @@ -418,6 +439,7 @@ def provision_bq_remote_function( is_row_processor=is_row_processor, vpc_connector=cloud_function_vpc_connector, memory_mib=cloud_function_memory_mib, + ingress_settings=cloud_function_ingress_settings, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 6bc7a4b079..a924dbd9c5 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -19,7 +19,17 @@ import inspect import sys import threading -from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union +from typing import ( + Any, + cast, + Dict, + Literal, + Mapping, + Optional, + Sequence, + TYPE_CHECKING, + Union, +) import warnings import bigframes_vendored.constants as constants @@ -110,6 +120,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -280,6 +293,11 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_ingress_settings (str, Optional): + Ingress settings controls dictating what traffic can reach the + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details + https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. """ # Some defaults may be used from the session if not provided otherwise import bigframes.exceptions as bf_exceptions @@ -504,6 +522,7 @@ def try_delattr(attr): is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) # TODO(shobs): Find a better way to support udfs with param named "name". diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7d0cfaee5c..3a9cba442c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1040,6 +1040,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1194,6 +1197,11 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_ingress_settings (str, Optional): + Ingress settings controls dictating what traffic can reach the + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details + https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1220,6 +1228,7 @@ def remote_function( cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index e224f65a01..18d2609347 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1690,6 +1690,9 @@ def analyze(row): ), ), id="multiindex", + marks=pytest.mark.skip( + reason="TODO(b/368639580) revert this skip after fix" + ), ), pytest.param( pandas.DataFrame( @@ -2170,3 +2173,69 @@ def foo(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo ) + + +@pytest.mark.parametrize( + ("ingress_settings_args", "effective_ingress_settings"), + [ + pytest.param( + {}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, id="no-set" + ), + pytest.param( + {"cloud_function_ingress_settings": "all"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, + id="set-all", + ), + pytest.param( + {"cloud_function_ingress_settings": "internal-only"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, + id="set-internal-only", + ), + pytest.param( + {"cloud_function_ingress_settings": "internal-and-gclb"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, + id="set-internal-and-gclb", + ), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_ingress_settings( + session, scalars_dfs, ingress_settings_args, effective_ingress_settings +): + try: + + def square(x: int) -> int: + return x * x + + square_remote = session.remote_function(reuse=False, **ingress_settings_args)( + square + ) + + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.ingress_settings == effective_ingress_settings + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_ingress_settings_unsupported(session): + with pytest.raises( + ValueError, match="'unknown' not one of the supported ingress settings values" + ): + + @session.remote_function(reuse=False, cloud_function_ingress_settings="unknown") + def square(x: int) -> int: + return x * x From 1e9d3bf872abebc23e0455ecfaf427dcda394b8c Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Sat, 21 Sep 2024 09:21:05 +0000 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .gitignore | 2 -- 1 file changed, 2 deletions(-) diff --git a/.gitignore b/.gitignore index ed62109f27..d083ea1ddc 100644 --- a/.gitignore +++ b/.gitignore @@ -51,8 +51,6 @@ docs.metadata # Virtual environment env/ venv/ -venv1/ -venv2/ # Test logs coverage.xml From 9d474c3af01a4f84c3b072a65ec108a19e5b4ff4 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 21 Sep 2024 09:30:58 +0000 Subject: [PATCH 3/4] propagate new param to bigframes.pandas module --- bigframes/pandas/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 94ea6becab..1bdf49eaf5 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -669,6 +669,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -687,6 +690,7 @@ def remote_function( cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) From e847ae737ad39ca48694c6ad7af57de011a003a3 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 21 Sep 2024 09:55:03 +0000 Subject: [PATCH 4/4] fix mypy error --- bigframes/functions/_remote_function_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/functions/_remote_function_client.py b/bigframes/functions/_remote_function_client.py index e004ecc12f..5acd31b425 100644 --- a/bigframes/functions/_remote_function_client.py +++ b/bigframes/functions/_remote_function_client.py @@ -341,8 +341,9 @@ def create_cloud_function( ingress_settings, list(_INGRESS_SETTINGS_MAP) ) ) - function.service_config.ingress_settings = _INGRESS_SETTINGS_MAP.get( - ingress_settings + function.service_config.ingress_settings = cast( + functions_v2.ServiceConfig.IngressSettings, + _INGRESS_SETTINGS_MAP[ingress_settings], ) function.kms_key_name = self._cloud_function_kms_key_name create_function_request.function = function