8000 feat(bigquery): add timeout to QueryJob.done() · googleapis/google-cloud-python@2f37317 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2f37317

Browse files
committed
feat(bigquery): add timeout to QueryJob.done()
1 parent 80f5295 commit 2f37317

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

bigquery/google/cloud/bigquery/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,7 +1068,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False):
10681068
raise
10691069

10701070
def _get_query_results(
1071-
self, job_id, retry, project=None, timeout_ms=None, location=None
1071+
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None,
10721072
):
10731073
"""Get the query results object for a query job.
10741074
@@ -1083,6 +1083,9 @@ def _get_query_results(
10831083
(Optional) number of milliseconds the the API call should
10841084
wait for the query to complete before the request times out.
10851085
location (str): Location of the query job.
1086+
timeout (Optional[float]):
1087+
The number of seconds to wait for the underlying HTTP transport
1088+
before retrying the HTTP request.
10861089
10871090
Returns:
10881091
google.cloud.bigquery.query._QueryResults:
@@ -1109,7 +1112,7 @@ def _get_query_results(
11091112
# job is complete (from QueryJob.done(), called ultimately from
11101113
# QueryJob.result()). So we don't need to poll here.
11111114
resource = self._call_api(
1112-
retry, method="GET", path=path, query_params=extra_params
1115+
retry, method="GET", path=path, query_params=extra_params, timeout=timeout
11131116
)
11141117
return _QueryResults.from_api_repr(resource)
11151118

bigquery/google/cloud/bigquery/job.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ def exists(self, client=None, retry=DEFAULT_RETRY):
663663
else:
664664
return True
665665

666-
def reload(self, client=None, retry=DEFAULT_RETRY):
666+
def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None):
667667
"""API call: refresh job properties via a GET request.
668668
669669
See
@@ -675,6 +675,9 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
675675
``client`` stored on the current dataset.
676676
677677
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
678+
timeout (Optional[float]):
679+
The number of seconds to wait for the underlying HTTP transport
680+
before retrying the HTTP request.
678681
"""
679682
client = self._require_client(client)
680683

@@ -683,7 +686,11 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
683686
extra_params["location"] = self.location
684687

685688
api_response = client._call_api(
686-
retry, method="GET", path=self.path, query_params=extra_params
689+
retry,
690+
method="GET",
691+
path=self.path,
692+
query_params=extra_params,
693+
timeout=timeout,
687694
)
688695
self._set_properties(api_response)
689696

@@ -2994,9 +3001,16 @@ def estimated_bytes_processed(self):
29943001
result = int(result)
29953002
return result
29963003

2997-
def done(self, retry=DEFAULT_RETRY):
3004+
def done(self, retry=DEFAULT_RETRY, timeout=None):
29983005
"""Refresh the job and checks if it is complete.
29993006
3007+
Args:
3008+
retry (Optional[google.api_core.retry.Retry]):
3009+
How to retry the call that retrieves query results.
3010+
timeout (Optional[float]):
3011+
The number of seconds to wait for the underlying HTTP transport
3012+
before retrying the HTTP request.
3013+
30003014
Returns:
30013015
bool: True if the job is complete, False otherwise.
30023016
"""
@@ -3007,11 +3021,17 @@ def done(self, retry=DEFAULT_RETRY):
30073021
timeout_ms = None
30083022
if self._done_timeout is not None:
30093023
# Subtract a buffer for context switching, network latency, etc.
3010-
timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
3011-
timeout = max(min(timeout, 10), 0)
3012-
self._done_timeout -= timeout
3024+
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
3025+
api_timeout = max(min(api_timeout, 10), 0)
3026+
self._done_timeout -= api_timeout
30133027
self._done_timeout = max(0, self._done_timeout)
3014-
timeout_ms = int(timeout * 1000)
3028+
timeout_ms = int(api_timeout * 1000)
3029+
3030+
if timeout_ms is not None and timeout_ms > 0:
3031+
if timeout is not None:
3032+
timeout = min(timeout_ms, timeout)
3033+
else:
3034+
timeout = timeout_ms
30153035

30163036
# Do not refresh is the state is already done, as the job will not
30173037
# change once complete.
@@ -3022,13 +3042,14 @@ def done(self, retry=DEFAULT_RETRY):
30223042
project=self.project,
30233043
timeout 57A7 _ms=timeout_ms,
30243044
location=self.location,
3045+
timeout=timeout,
30253046
)
30263047

30273048
# Only reload the job once we know the query is complete.
30283049
# This will ensure that fields such as the destination table are
30293050
# correctly populated.
30303051
if self._query_results.complete:
3031-
self.reload(retry=retry)
3052+
self.reload(retry=retry, timeout=timeout)
30323053

30333054
return self.state == _DONE_STATE
30343055

0 commit comments

Comments
 (0)
0