diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index ded4760fa3..833f0d1d1d 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -400,6 +400,7 @@ def remote_function( bigquery_connection: Optional[str] = None, reuse: bool = True, name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -409,6 +410,7 @@ def remote_function( bigquery_connection=bigquery_connection, reuse=reuse, name=name, + packages=packages, ) diff --git a/bigframes/remote_function.py b/bigframes/remote_function.py index fd9aec825f..c82ba84056 100644 --- a/bigframes/remote_function.py +++ b/bigframes/remote_function.py @@ -100,9 +100,12 @@ def get_remote_function_locations(bq_location): return bq_location, cloud_function_region -def _get_hash(def_): +def _get_hash(def_, package_requirements=None): "Get hash (32 digits alphanumeric) of a function." def_repr = cloudpickle.dumps(def_, protocol=_pickle_protocol_version) + if package_requirements: + for p in sorted(package_requirements): + def_repr += p.encode() return hashlib.md5(def_repr).hexdigest() @@ -129,18 +132,18 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name(def_, uniq_suffix=None): +def get_cloud_function_name(def_, uniq_suffix=None, package_requirements=None): "Get a name for the cloud function for the given user defined function." - cf_name = _get_hash(def_) + cf_name = _get_hash(def_, package_requirements) cf_name = f"bigframes-{cf_name}" # for identification if uniq_suffix: cf_name = f"{cf_name}-{uniq_suffix}" return cf_name -def get_remote_function_name(def_, uniq_suffix=None): +def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None): "Get a name for the BQ remote function for the given user defined function." - bq_rf_name = _get_hash(def_) + bq_rf_name = _get_hash(def_, package_requirements) bq_rf_name = f"bigframes_{bq_rf_name}" # for identification if uniq_suffix: bq_rf_name = f"{bq_rf_name}_{uniq_suffix}" @@ -200,7 +203,8 @@ def create_bq_remote_function( RETURNS {bq_function_return_type} REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}` OPTIONS ( - endpoint = "{endpoint}" + endpoint = "{endpoint}", + max_batching_rows = 1000 )""" logger.info(f"Creating BQ remote function: {create_function_ddl}") @@ -320,11 +324,14 @@ def {handler_func_name}(request): return handler_func_name - def generate_cloud_function_code(self, def_, dir): + def generate_cloud_function_code(self, def_, dir, package_requirements=None): """Generate the cloud function code for a given user defined function.""" # requirements.txt requirements = ["cloudpickle >= 2.1.0"] + if package_requirements: + requirements.extend(package_requirements) + requirements = sorted(requirements) requirements_txt = os.path.join(dir, "requirements.txt") with open(requirements_txt, "w") as f: f.write("\n".join(requirements)) @@ -333,12 +340,14 @@ def generate_cloud_function_code(self, def_, dir): entry_point = self.generate_cloud_function_main_code(def_, dir) return entry_point - def create_cloud_function(self, def_, cf_name): + def create_cloud_function(self, def_, cf_name, package_requirements=None): """Create a cloud function from the given user defined function.""" # Build and deploy folder structure containing cloud function with tempfile.TemporaryDirectory() as dir: - entry_point = self.generate_cloud_function_code(def_, dir) + entry_point = self.generate_cloud_function_code( + def_, dir, package_requirements + ) archive_path = shutil.make_archive(dir, "zip", dir) # We are creating cloud function source code from the currently running @@ -392,6 +401,9 @@ def create_cloud_function(self, def_, cf_name): function.build_config.source.storage_source.object_ = ( upload_url_response.storage_source.object_ ) + function.service_config = functions_v2.ServiceConfig() + function.service_config.available_memory = "1024M" + function.service_config.timeout_seconds = 600 create_function_request.function = function # Create the cloud function and wait for it to be ready to use @@ -422,6 +434,7 @@ def provision_bq_remote_function( output_type, reuse, name, + package_requirements, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -435,19 +448,25 @@ def provision_bq_remote_function( # Derive the name of the cloud function underlying the intended BQ # remote function - cloud_function_name = get_cloud_function_name(def_, uniq_suffix) + cloud_function_name = get_cloud_function_name( + def_, uniq_suffix, package_requirements + ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) # Create the cloud function if it does not exist if not cf_endpoint: - cf_endpoint = self.create_cloud_function(def_, cloud_function_name) + cf_endpoint = self.create_cloud_function( + def_, cloud_function_name, package_requirements + ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") # Derive the name of the remote function remote_function_name = name if not remote_function_name: - remote_function_name = get_remote_function_name(def_, uniq_suffix) + remote_function_name = get_remote_function_name( + def_, uniq_suffix, package_requirements + ) rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name) # Create the BQ remote function in following circumstances: @@ -619,6 +638,7 @@ def remote_function( bigquery_connection: Optional[str] = None, reuse: bool = True, name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -710,6 +730,10 @@ def remote_function( caution, because two users working in the same project and dataset could overwrite each other's remote functions if they use the same persistent name. + packages (str[], Optional): + Explicit name of the external package dependencies. Each dependency + is added to the `requirements.txt` as is, and can be of the form + supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. """ import bigframes.pandas as bpd @@ -821,6 +845,7 @@ def wrapper(f): ibis_signature.output_type, reuse, name, + packages, ) node = remote_function_node(dataset_ref.routine(rf_name), ibis_signature) diff --git a/bigframes/session.py b/bigframes/session.py index 6c1160c88e..fa5b415350 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1413,6 +1413,7 @@ def remote_function( bigquery_connection: Optional[str] = None, reuse: bool = True, name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1467,7 +1468,7 @@ def remote_function( Name of the BigQuery connection. You should either have the connection already created in the `location` you have chosen, or you should have the Project IAM Admin role to enable the service - to create the connection for you if you need it.If this parameter is + to create the connection for you if you need it. If this parameter is not provided then the BigQuery connection from the session is used. reuse (bool, Optional): Reuse the remote function if already exists. @@ -1482,6 +1483,10 @@ def remote_function( caution, because two users working in the same project and dataset could overwrite each other's remote functions if they use the same persistent name. + packages (str[], Optional): + Explicit name of the external package dependencies. Each dependency + is added to the `requirements.txt` as is, and can be of the form + supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1499,6 +1504,7 @@ def remote_function( bigquery_connection=bigquery_connection, reuse=reuse, name=name, + packages=packages, ) def read_gbq_function( diff --git a/samples/snippets/remote_function.py b/samples/snippets/remote_function.py index 9998a23eb2..646d7b0c30 100644 --- a/samples/snippets/remote_function.py +++ b/samples/snippets/remote_function.py @@ -89,19 +89,25 @@ def get_bucket(num): # say we consider the `species`, `island` and `sex` of the penguins # sensitive information and want to redact that by replacing with their hash # code instead. Let's define another scalar custom function and decorate it - # as a remote function + # as a remote function. The custom function in this example has external + # package dependency, which can be specified via `packages` parameter. @bpd.remote_function( - [str], str, bigquery_connection="bigframes-rf-conn", reuse=False + [str], + str, + bigquery_connection="bigframes-rf-conn", + reuse=False, + packages=["cryptography"], ) def get_hash(input): - import hashlib + from cryptography.fernet import Fernet # handle missing value if input is None: input = "" - encoded_input = input.encode() - hash = hashlib.md5(encoded_input) - return hash.hexdigest() + + key = Fernet.generate_key() + f = Fernet(key) + return f.encrypt(input.encode()).decode() # We can use this remote function in another `pandas`-like API `map` that # can be applied on a DataFrame diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index f270099182..730a1dbde4 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -916,6 +916,51 @@ def square(x): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_with_external_package_dependencies( + session, scalars_dfs, dataset_id, bq_cf_connection, functions_client +): + try: + + def pd_np_foo(x): + import numpy as mynp + import pandas as mypd + + return mypd.Series([x, mynp.sqrt(mynp.abs(x))]).sum() + + # Create the remote function with the name provided explicitly + pd_np_foo_remote = session.remote_function( + [int], + float, + dataset_id, + bq_cf_connection, + reuse=False, + packages=["numpy", "pandas >= 2.0.0"], + )(pd_np_foo) + + # The behavior of the created remote function should be as expected + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_too"] + bf_result_col = bf_int64_col.apply(pd_np_foo_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result_col = pd_int64_col.apply(pd_np_foo) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + + # pandas result is non-nullable type float64, make it Float64 before + # comparing for the purpose of this test + pd_result.result = pd_result.result.astype(pandas.Float64Dtype()) + + assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, functions_client, pd_np_foo_remote + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_explicit_name_reuse( session, scalars_dfs, dataset_id, bq_cf_connection, functions_client