8000 fix: exceptions raised in `apply` from a `remote_function` now surface in the client by shobsi · Pull Request #387 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import random
import shutil
import string
import subprocess
import sys
import tempfile
import textwrap
Expand Down Expand Up @@ -87,19 +86,6 @@ def _get_hash(def_, package_requirements=None):
return hashlib.md5(def_repr).hexdigest()


def _run_system_command(command):
program = subprocess.Popen(
[command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
stdout, stderr = program.communicate()
exit_code = program.wait()
if exit_code:
raise RuntimeError(
f"Command: {command}\nOutput: {stdout.decode()}\nError: {stderr.decode()}"
f"{constants.FEEDBACK_LINK}"
)


def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str:
return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}"

Expand Down Expand Up @@ -281,6 +267,8 @@ def generate_cloud_function_main_code(self, def_, dir):
code_template = textwrap.dedent(
"""\
import cloudpickle
import functions_framework
from flask import jsonify
import json

# original udf code is in {udf_code_file}
Expand All @@ -289,14 +277,17 @@ def generate_cloud_function_main_code(self, def_, dir):
udf = cloudpickle.load(f)

def {handler_func_name}(request):
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
try:
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
except Exception as e:
return jsonify( {{ "errorMessage": str(e) }} ), 400
"""
)

Expand Down
24 changes: 23 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tempfile
import textwrap

from google.api_core.exceptions import NotFound, ResourceExhausted
from google.api_core.exceptions import BadRequest, NotFound, ResourceExhausted
from google.cloud import bigquery, functions_v2
import pandas
import pytest
Expand Down Expand Up @@ -1214,6 +1214,28 @@ def square(x):
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_runtime_error(session, scalars_dfs, dataset_id):
try:

@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, _ = scalars_dfs

with pytest.raises(
BadRequest, match="400.*errorMessage.*unsupported operand type"
):
# int64_col has nulls which should cause error in square
scalars_df["int64_col"].apply(square).to_pandas()
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_anonymous_dataset(session, scalars_dfs):
try:
Expand Down
0