|
1 | 1 | from .endpoint import Endpoint, QuerysetEndpoint, api
|
2 |
| -from .. import FlowRunItem, PaginationItem |
3 | 2 | from .exceptions import FlowRunFailedException, FlowRunCanceledException
|
4 |
| -import time |
| 3 | +from .. import FlowRunItem, PaginationItem |
| 4 | +from ...exponential_backoff import ExponentialBackoffTimer |
5 | 5 |
|
6 | 6 | import logging
|
7 | 7 |
|
8 | 8 | logger = logging.getLogger("tableau.endpoint.flowruns")
|
9 | 9 |
|
10 |
| -# Polling for job completion uses exponential backoff for the sleep intervals between polls |
11 |
| -ASYNC_JOB_POLL_MIN_INTERVAL=0.5 |
12 |
| -ASYNC_JOB_POLL_MAX_INTERVAL=30 |
13 |
| -ASYNC_JOB_POLL_BACKOFF_FACTOR=1.4 |
14 | 10 |
|
15 | 11 | class FlowRuns(QuerysetEndpoint):
|
16 | 12 | def __init__(self, parent_srv):
|
@@ -56,25 +52,17 @@ def cancel(self, flow_run_id):
|
56 | 52 |
|
57 | 53 | @api(version="3.10")
|
58 | 54 | def wait_for_job(self, flow_run_id, *, timeout=None):
|
59 |
| - id_ = getattr(flow_run_id, "id", flow_run_id) |
60 |
| - wait_start_time = time.time() |
61 |
| - logger.debug(f"Waiting for job {id_}") |
| 55 | + if isinstance(flow_run_id, FlowRunItem): |
| 56 | + flow_run_id = flow_run_id.id |
| 57 | + assert isinstance(flow_run_id, str) |
| 58 | + logger.debug(f"Waiting for flow run {flow_run_id}") |
62 | 59 |
|
63 |
| - current_sleep_interval = ASYNC_JOB_POLL_MIN_INTERVAL |
64 |
| - flow_run = self.get_by_id(id_) |
| 60 | + backoffTimer = ExponentialBackoffTimer(timeout=timeout) |
| 61 | + flow_run = self.get_by_id(flow_run_id) |
65 | 62 | while flow_run.completed_at is None:
|
66 |
| - max_sleep_time = ASYNC_JOB_POLL_MAX_INTERVAL |
67 |
| - |
68 |
| - if timeout is not None: |
69 |
| - elapsed = (time.time() - wait_start_time) |
70 |
| - if elapsed >= timeout: |
71 |
| - raise TimeoutError(f"Timeout after {elapsed} seconds waiting for asynchronous flow run: {id_}") |
72 |
| - max_sleep_time = max(ASYNC_JOB_POLL_MIN_INTERVAL, timeout - elapsed) |
73 |
| - |
74 |
| - time.sleep(min(current_sleep_interval, max_sleep_time)) |
75 |
| - job = self.get_by_id(id_) |
76 |
| - current_sleep_interval *= ASYNC_JOB_POLL_BACKOFF_FACTOR |
77 |
| - logger.debug(f"\tFlowRun {id_} progress={flow_run.progress}") |
| 63 | + backoffTimer.sleep() |
| 64 | + flow_run = self.get_by_id(flow_run_id) |
| 65 | + logger.debug(f"\tFlowRun {flow_run_id} progress={flow_run.progress}") |
78 | 66 |
|
79 | 67 | logger.info("FlowRun {} Completed: Status: {}".format(id_, flow_run.status))
|
80 | 68 |
|
|
0 commit comments