10000 Add wait_for_job to FlowRun · tableau/server-client-python@43d7c2d · GitHub
[go: up one dir, main page]

Skip to content

Commit 43d7c2d

Browse files
Jordan Woodsjorwoods
Jordan Woods
authored andcommitted
Add wait_for_job to FlowRun
1 parent 04f235f commit 43d7c2d

File tree

2 files changed

+53
-0
lines changed

2 files changed

+53
-0
lines changed

tableauserverclient/server/endpoint/exceptions.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,16 @@ def __str__(self):
6464
from pprint import pformat
6565

6666
return pformat(self.error)
67+
68+
69+
class FlowRunFailedException(Exception):
70+
def __init__(self, flow_run):
71+
self.background_job_id = flow_run.background_job_id
72+
self.flow_run = flow_run
73+
74+
def __str__(self):
75+
return f"FlowRun {self.flow_run.id} failed with job id {self.background_job_id}"
76+
77+
78+
class FlowRunCanceledException(FlowRunFailedException):
79+
pass

tableauserverclient/server/endpoint/flow_runs_endpoint.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
from .endpoint import Endpoint, QuerysetEndpoint, api
22
from .. import FlowRunItem, PaginationItem
3+
from .exceptions import FlowRunFailedException, FlowRunCanceledException
4+
import time
35

46
import logging
57

68
logger = logging.getLogger("tableau.endpoint.flowruns")
79

10+
# Polling for job completion uses exponential backoff for the sleep intervals between polls
11+
ASYNC_JOB_POLL_MIN_INTERVAL=0.5
12+
ASYNC_JOB_POLL_MAX_INTERVAL=30
13+
ASYNC_JOB_POLL_BACKOFF_FACTOR=1.4
814

915
class FlowRuns(QuerysetEndpoint):
1016
def __init__(self, parent_srv):
@@ -46,3 +52,37 @@ def cancel(self, flow_run_id):
4652
url = "{0}/{1}".format(self.baseurl, id_)
4753
self.put_request(url)
4854
logger.info("Deleted single flow (ID: {0})".format(id_))
55+
56+
57+
@api(version="3.10")
58+
def wait_for_job(self, flow_run_id, *, timeout=None):
59+
id_ = getattr(flow_run_id, "id", flow_run_id)
60+
wait_start_time = time.time()
61+
logger.debug(f"Waiting for job {id_}")
62+
63+
current_sleep_interval = ASYNC_JOB_POLL_MIN_INTERVAL
64+
flow_run = self.get_by_id(id_)
65+
while flow_run.completed_at is None:
66+
max_sleep_time = ASYNC_JOB_POLL_MAX_INTERVAL
67+
68+
if timeout is not None:
69+
elapsed = (time.time() - wait_start_time)
70+
if elapsed >= timeout:
71+
raise TimeoutError(f"Timeout after {elapsed} seconds waiting for asynchronous flow run: {id_}")
72+
max_sleep_time = max(ASYNC_JOB_POLL_MIN_INTERVAL, timeout - elapsed)
73+
74+
time.sleep(min(current_sleep_interval, max_sleep_time))
75+
job = self.get_by_id(id_)
76+
current_sleep_interval *= ASYNC_JOB_POLL_BACKOFF_FACTOR
77+
logger.debug(f"\tFlowRun {id_} progress={flow_run.progress}")
78+
79+
logger.info("FlowRun {} Completed: Status: {}".format(id_, flow_run.status))
80+
81+
if flow_run.status == "Success":
82+
return flow_run
83+
elif flow_run.status == "Failed":
84+
raise FlowRunFailedException(flow_run)
85+
elif flow_run.finish_code in ["Canceled", "Cancelled"]:
86+
raise FlowRunCanceledException(flow_run)
87+
else:
88+
raise AssertionError("Unexpected status in flow_run", flow_run)

0 commit comments

Comments
 (0)
0