8000 Rewrite flow_run to use ExponentialBackoffTimer · tableau/server-client-python@6627762 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6627762

Browse files
committed
Rewrite flow_run to use ExponentialBackoffTimer
1 parent e9b9703 commit 6627762

File tree

1 file changed

+11
-23
lines changed

1 file changed

+11
-23
lines changed

tableauserverclient/server/endpoint/flow_runs_endpoint.py

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
from .endpoint import Endpoint, QuerysetEndpoint, api
2-
from .. import FlowRunItem, PaginationItem
32
from .exceptions import FlowRunFailedException, FlowRunCanceledException
4-
import time
3+
from .. import FlowRunItem, PaginationItem
4+
from ...exponential_backoff import ExponentialBackoffTimer
55

66
import logging
77

88
logger = logging.getLogger("tableau.endpoint.flowruns")
99

10-
# Polling for job completion uses exponential backoff for the sleep intervals between polls
11-
ASYNC_JOB_POLL_MIN_INTERVAL=0.5
12-
ASYNC_JOB_POLL_MAX_INTERVAL=30
13-
ASYNC_JOB_POLL_BACKOFF_FACTOR=1.4
1410

1511
class FlowRuns(QuerysetEndpoint):
1612
def __init__(self, parent_srv):
@@ -56,25 +52,17 @@ def cancel(self, flow_run_id):
5652

5753
@api(version="3.10")
5854
def wait_for_job(self, flow_run_id, *, timeout=None):
59-
id_ = getattr(flow_run_id, "id", flow_run_id)
60-
wait_start_time = time.time()
61-
logger.debug(f"Waiting for job {id_}")
55+
if isinstance(flow_run_id, FlowRunItem):
56+
flow_run_id = flow_run_id.id
57+
assert isinstance(flow_run_id, str)
58+
logger.debug(f"Waiting for flow run {flow_run_id}")
6259

63-
current_sleep_interval = ASYNC_JOB_POLL_MIN_INTERVAL
64-
flow_run = self.get_by_id(id_)
60+
backoffTimer = ExponentialBackoffTimer(timeout=timeout)
61+
flow_run = self.get_by_id(flow_run_id)
6562
while flow_run.completed_at is None:
66-
max_sleep_time = ASYNC_JOB_POLL_MAX_INTERVAL
67-
68-
if timeout is not None:
69-
elapsed = (time.time() - wait_start_time)
70-
if elapsed >= timeout:
71-
raise TimeoutError(f"Timeout after {elapsed} seconds waiting for asynchronous flow run: {id_}")
72-
max_sleep_time = max(ASYNC_JOB_POLL_MIN_INTERVAL, timeout - elapsed)
73-
74-
time.sleep(min(current_sleep_interval, max_sleep_time))
75-
job = self.get_by_id(id_)
76-
current_sleep_interval *= ASYNC_JOB_POLL_BACKOFF_FACTOR
77-
logger.debug(f"\tFlowRun {id_} progress={flow_run.progress}")
63+
backoffTimer.sleep()
64+
flow_run = self.get_by_id(flow_run_id)
65+
logger.debug(f"\tFlowRun {flow_run_id} progress={flow_run.progress}")
7866

7967
logger.info("FlowRun {} Completed: Status: {}".format(id_, flow_run.status))
8068

0 commit comments

Comments
 (0)
0