1
1
from .endpoint import Endpoint , QuerysetEndpoint , api
2
2
from .. import FlowRunItem , PaginationItem
3
+ from .exceptions import FlowRunFailedException , FlowRunCanceledException
4
+ import time
3
5
4
6
import logging
5
7
6
8
logger = logging .getLogger ("tableau.endpoint.flowruns" )
7
9
10
+ # Polling for job completion uses exponential backoff for the sleep intervals between polls
11
+ ASYNC_JOB_POLL_MIN_INTERVAL = 0.5
12
+ ASYNC_JOB_POLL_MAX_INTERVAL = 30
13
+ ASYNC_JOB_POLL_BACKOFF_FACTOR = 1.4
8
14
9
15
class FlowRuns (QuerysetEndpoint ):
10
16
def __init__ (self , parent_srv ):
@@ -46,3 +52,37 @@ def cancel(self, flow_run_id):
46
52
url = "{0}/{1}" .format (self .baseurl , id_ )
47
53
self .put_request (url )
48
54
logger .info ("Deleted single flow (ID: {0})" .format (id_ ))
55
+
56
+
57
+ @api (version = "3.10" )
58
+ def wait_for_job (self , flow_run_id , * , timeout = None ):
59
+ id_ = getattr (flow_run_id , "id" , flow_run_id )
60
+ wait_start_time = time .time ()
61
+ logger .debug (f"Waiting for job { id_ } " )
62
+
63
+ current_sleep_interval = ASYNC_JOB_POLL_MIN_INTERVAL
64
+ flow_run = self .get_by_id (id_ )
65
+ while flow_run .completed_at is None :
66
+ max_sleep_time = ASYNC_JOB_POLL_MAX_INTERVAL
67
+
68
+ if timeout is not None :
69
+ elapsed = (time .time () - wait_start_time )
70
+ if elapsed >= timeout :
71
+ raise TimeoutError (f"Timeout after { elapsed } seconds waiting for asynchronous flow run: { id_ } " )
72
+ max_sleep_time = max (ASYNC_JOB_POLL_MIN_INTERVAL , timeout - elapsed )
73
+
74
+ time .sleep (min (current_sleep_interval , max_sleep_time ))
75
+ job = self .get_by_id (id_ )
76
+ current_sleep_interval *= ASYNC_JOB_POLL_BACKOFF_FACTOR
77
+ logger .debug (f"\t FlowRun { id_ } progress={ flow_run .progress } " )
78
+
79
+ logger .info ("FlowRun {} Completed: Status: {}" .format (id_ , flow_run .status ))
80
+
81
+ if flow_run .status == "Success" :
82
+ return flow_run
83
+ elif flow_run .status == "Failed" :
84
+ raise FlowRunFailedException (flow_run )
85
+ elif flow_run .finish_code in ["Canceled" , "Cancelled" ]:
86
+ raise FlowRunCanceledException (flow_run )
87
+ else :
88
+ raise AssertionError ("Unexpected status in flow_run" , flow_run )
0 commit comments