8000 test: restore remote function stickiness in small tests by shobsi · Pull Request #847 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,12 @@ class IbisSignature(NamedTuple):
output_type: IbisDataType


def get_cloud_function_name(function_hash, session_id, uniq_suffix=None):
def get_cloud_function_name(function_hash, session_id=None, uniq_suffix=None):
"Get a name for the cloud function for the given user defined function."
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX]
if session_id:
parts.append(session_id)
parts.append(function_hash)
if uniq_suffix:
parts.append(uniq_suffix)
return _GCF_FUNCTION_NAME_SEPERATOR.join(parts)
Expand Down Expand Up @@ -566,10 +569,13 @@ def provision_bq_remote_function(
)

# Derive the name of the cloud function underlying the intended BQ
# remote function, also collect updated package requirements as
# determined in the name resolution
# remote function. Use the session id to identify the GCF for unnamed
# functions. The named remote functions are treated as a persistant
# artifacts, so let's keep them independent of session id, which also
# makes their naming more stable for the same udf code
session_id = None if name else self._session.session_id
cloud_function_name = get_cloud_function_name(
function_hash, self._session.session_id, uniq_suffix
function_hash, session_id, uniq_suffix
)
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)

Expand Down Expand Up @@ -635,13 +641,12 @@ def get_remote_function_specs(self, remote_function_name):
)
try:
for routine in routines:
routine = cast(bigquery.Routine, routine)
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
rf_options = routine.remote_function_options
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
http_endpoint = rf_options.endpoint
bq_connection = rf_options.connection
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
Expand Down Expand Up @@ -731,15 +736,15 @@ class _RemoteFunctionSession:

def __init__(self):
# Session level mapping of remote function artifacts
self._temp_session_artifacts: Dict[str, str] = dict()
self._temp_artifacts: Dict[str, str] = dict()

# Lock to synchronize the update of the session level mapping
self._session_artifacts_lock = threading.Lock()
# Lock to synchronize the update of the session artifacts
self._artifacts_lock = threading.Lock()

def _update_artifacts(self, bqrf_routine: str, gcf_path: str):
def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str):
"""Update remote function artifacts in the current session."""
with self._session_artifacts_lock:
self._temp_session_artifacts[bqrf_routine] = gcf_path
with self._artifacts_lock:
self._temp_artifacts[bqrf_routine] = gcf_path

def clean_up(
self,
Expand All @@ -748,8 +753,8 @@ def clean_up(
session_id: str,
):
"""Delete remote function artifacts in the current session."""
with self._session_artifacts_lock:
for bqrf_routine, gcf_path in self._temp_session_artifacts.items():
with self._artifacts_lock:
for bqrf_routine, gcf_path in self._temp_artifacts.items():
# Let's accept the possibility that the remote function may have
# been deleted directly by the user
bqclient.delete_routine(bqrf_routine, not_found_ok=True)
Expand All @@ -761,7 +766,7 @@ def clean_up(
except google.api_core.exceptions.NotFound:
pass

self._temp_session_artifacts.clear()
self._temp_artifacts.clear()

# Inspired by @udf decorator implemented in ibis-bigquery package
# https://github.com/ibis-project/ibis-bigquery/blob/main/ibis_bigquery/udf/__init__.py
Expand Down Expand Up @@ -1206,7 +1211,7 @@ def try_delattr(attr):
# explicit name, we are assuming that the user wants to persist them
# with that name and would directly manage their lifecycle.
if created_new and (not name):
self._update_artifacts(
self._update_temp_artifacts(
func.bigframes_remote_function, func.bigframes_cloud_function
)
return func
Expand Down
24 changes: 21 additions & 3 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,28 @@ def clean_up_by_session_id(
option_context = config.option_context
"""Global :class:`~bigframes._config.option_context` to configure BigQuery DataFrames."""


# Session management APIs
get_global_session = global_session.get_global_session
close_session = global_session.close_session
reset_session = global_session.close_session
def get_global_session():
return global_session.get_global_session()


get_global_session.__doc__ = global_session.get_global_session.__doc__


def close_session():
return global_session.close_session()


close_session.__doc__ = global_session.close_session.__doc__


def reset_session():
return global_session.close_session()


reset_session.__doc__ = global_session.close_session.__doc__


# SQL Compilation uses recursive algorithms on deep trees
# 10M tree depth should be sufficient to generate any sql that is under bigquery limit
Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def _clean_up_tables(self):
def close(self):
"""Delete resources that were created with this session's session_id.
This includes BigQuery tables, remote functions and cloud functions
serving the remote functions"""
serving the remote functions."""
self._clean_up_tables()
self._remote_function_session.clean_up(
self.bqclient, self.cloudfunctionsclient, self.session_id
Expand Down
Loading
0