8000 Add `jobs.wait_for_job` method (#903) · mattholy/server-client-python@9ccc713 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9ccc713

Browse files
authored
Add jobs.wait_for_job method (tableau#903)
This commit adds a `wait_for_job` method which will repeatedly poll a job's status until that job is finished. Internally, it uses an exponential backoff for the polling intervals. That way, it is snappy for fast-running jobs without putting too much load on the server for long-running jobs. It returns the successfully finished `JobItem` object which might be of interest to the caller, e.g. to inspect the reported `started_at` `finished_at` times or the `notes`. For failed jobs, `wait_for_job` raises an exception. That way, we ensure that errors in jobs don't accidentally go unnoticed. The `jobs` object can still be retrieved from the exception object, if required.
1 parent 9ac17e4 commit 9ccc713

File tree

11 files changed

+213
-15
lines changed

11 files changed

+213
-15
lines changed

samples/refresh.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,19 @@ def main():
3939
resource = server.workbooks.get_by_id(args.resource_id)
4040

4141
# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
42-
results = server.workbooks.refresh(args.resource_id)
42+
job = server.workbooks.refresh(args.resource_id)
4343
else:
4444
# Get the datasource by its Id to make sure it exists
4545
resource = server.datasources.get_by_id(args.resource_id)
4646

4747
# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
48-
results = server.datasources.refresh(resource)
49-
50-
print(results)
51-
# TODO: Add a flag that will poll and wait for the returned job to be done
48+
job = server.datasources.refresh(resource)
49+
50+
print(f"Update job posted (ID: {job.id})")
51+
print("Waiting for job...")
52+
# `wait_for_job` will throw if the job isn't executed successfully
53+
job = server.jobs.wait_for_job(job)
54+
print("Job finished succesfully")
5255

5356

5457
if __name__ == '__main__':

samples/update_datasource_data.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@ def main():
6767

6868
job = server.datasources.update_hyper_data(args.datasource_id, request_id=request_id, actions=actions)
6969

70-
# TODO: Add a flag that will poll and wait for the returned job to be done
71-
print(job)
70+
print(f"Update job posted (ID: {job.id})")
71+
print("Waiting for job...")
72+
# `wait_for_job` will throw if the job isn't executed successfully
73+
job = server.jobs.wait_for_job(job)
74+
print("Job finished succesfully")
75+
7276

7377
if __name__ == '__main__':
7478
main()
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import time
2+
3+
# Polling for server-side events (such as job completion) uses exponential backoff for the sleep intervals between polls
4+
ASYNC_POLL_MIN_INTERVAL=0.5
5+
ASYNC_POLL_MAX_INTERVAL=30
6+
ASYNC_POLL_BACKOFF_FACTOR=1.4
7+
8+
9+
class ExponentialBackoffTimer():
10+
def __init__(self, *, timeout=None):
11+
self.start_time = time.time()
12+
self.timeout = timeout
13+
self.current_sleep_interval = ASYNC_POLL_MIN_INTERVAL
14+
15+
def sleep(self):
16+
max_sleep_time = ASYNC_POLL_MAX_INTERVAL
17+
if self.timeout is not None:
18+
elapsed = (time.time() - self.start_time)
19+
if elapsed >= self.timeout:
20+
raise TimeoutError(f"Timeout after {elapsed} seconds waiting for asynchronous event")
21+
remaining_time = self.timeout - elapsed
22+
# Usually, we would sleep for `ASYNC_POLL_MAX_INTERVAL`, but we don't want to sleep over the timeout
23+
max_sleep_time = min(ASYNC_POLL_MAX_INTERVAL, remaining_time)
24+
# We want to sleep at least for `ASYNC_POLL_MIN_INTERVAL`. This is important to ensure that, as we get
25+
# closer to the timeout, we don't accidentally wake up multiple times and hit the server in rapid succession
26+
# due to waking up to early from the `sleep`.
27+
max_sleep_time = max(max_sleep_time, ASYNC_POLL_MIN_INTERVAL)
28+
29+
time.sleep(min(self.current_sleep_interval, max_sleep_time))
30+
self.current_sleep_interval *= ASYNC_POLL_BACKOFF_FACTOR

tableauserverclient/models/job_item.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,16 @@
33

44

55
class JobItem(object):
6+
class FinishCode:
7+
"""
8+
Status codes as documented on
9+
https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_jobs_tasks_and_schedules.htm#query_job
10+
"""
11+
Success = 0
12+
Failed = 1
13+
Cancelled = 2
14+
15+
616
def __init__(
717
self,
818
id_,
@@ -89,7 +99,7 @@ def _parse_element(cls, element, ns):
8999
created_at = parse_datetime(element.get("createdAt", None))
90100
started_at = parse_datetime(element.get("startedAt", None))
91101
completed_at = parse_datetime(element.get("completedAt", None))
92-
finish_code = element.get("finishCode", -1)
102+
finish_code = int(element.get("finishCode", -1))
93103
notes = [note.text for note in element.findall(".//t:notes", namespaces=ns)] or None
94104
mode = element.get("mode", None)
95105
return cls(

tableauserverclient/server/endpoint/exceptions.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,16 @@ def __str__(self):
6464
from pprint import pformat
6565

6666
return pformat(self.error)
67+
68+
69+
class JobFailedException(Exception):
70+
def __init__(self, job):
71+
self.notes = job.notes
72+
self.job = job
73+
74+
def __str__(self):
75+
return f"Job {self.job.id} failed with notes {self.notes}"
76+
77+
78+
class JobCanceledException(JobFailedException):
79+
pass

tableauserverclient/server/endpoint/jobs_endpoint.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from .endpoint import Endpoint, api
2+
from .exceptions import JobCanceledException, JobFailedException
23
from .. import JobItem, BackgroundJobItem, PaginationItem
34
from ..request_options import RequestOptionsBase
5+
from ...exponential_backoff import ExponentialBackoffTimer
46

57
import logging
68

@@ -12,7 +14,6 @@
1214

1315
logger = logging.getLogger("tableau.endpoint.jobs")
1416

15-
1617
class Jobs(Endpoint):
1718
@property
1819
def baseurl(self):
@@ -48,3 +49,27 @@ def get_by_id(self, job_id):
4849
server_response = self.get_request(url)
4950
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
5051
return new_job
52+
53+
def wait_for_job(self, job_id, *, timeout=None):
54+
if isinstance(job_id, JobItem):
55+
job_id = job_id.id
56+
assert isinstance(job_id, str)
57+
logger.debug(f"Waiting for job {job_id}")
58+
59+
backoffTimer = ExponentialBackoffTimer(timeout=timeout)
60+
job = self.get_by_id(job_id)
61+
while job.completed_at is None:
62+
backoffTimer.sleep()
63+
job = self.get_by_id(job_id)
64+
logger.debug(f"\tJob {job_id} progress={job.progress}")
65+
66+
logger.info("Job {} Completed: Finish Code: {} - Notes:{}".format(job_id, job.finish_code, job.notes))
67+
68+
if job.finish_code == JobItem.FinishCode.Success:
69+
return job
70+
elif job.finish_code == JobItem.FinishCode.Failed:
71+
raise JobFailedException(job)
72+
elif job.finish_code == JobItem.FinishCode.Cancelled:
73+
raise JobCanceledException(job)
74+
else:
75+
raise AssertionError("Unexpected finish_code in job", job)

test/_utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from contextlib import contextmanager
2+
import unittest
13
import os.path
24

35
TEST_ASSET_DIR = os.path.join(os.path.dirname(__file__), 'assets')
@@ -14,3 +16,19 @@ def read_xml_asset(filename):
1416

1517
def read_xml_assets(*args):
1618
return map(read_xml_asset, args)
19+
20+
21+
@contextmanager
22+
def mocked_time():
23+
mock_time = 0
24+
25+
def sleep_mock(interval):
26+
nonlocal mock_time
27+
mock_time += interval
28+
29+
def get_time():
30+
return mock_time
31+
32+
patch = unittest.mock.patch
33+
with patch("time.sleep", sleep_mock), patch("time.time", get_time):
34+
yield get_time

test/test_datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ def test_publish_async(self):
317317
self.assertEqual('PublishDatasource', new_job.type)
318318
self.assertEqual('0', new_job.progress)
319319
self.assertEqual('2018-06-30T00:54:54Z', format_datetime(new_job.created_at))
320-
self.assertEqual('1', new_job.finish_code)
320+
self.assertEqual(1, new_job.finish_code)
321321

322322
def test_publish_unnamed_file_object(self):
323323
new_datasource = TSC.DatasourceItem('test')

test/test_exponential_backoff.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import unittest
2+
from ._utils import mocked_time
3+
from tableauserverclient.exponential_backoff import ExponentialBackoffTimer
4+
5+
6+
class ExponentialBackoffTests(unittest.TestCase):
7+
def test_exponential(self):
8+
with mocked_time() as mock_time:
9+
exponentialBackoff = ExponentialBackoffTimer()
10+
# The creation of our mock shouldn't sleep
11+
self.assertAlmostEqual(mock_time(), 0)
12+
# The first sleep sleeps for a rather short time, the following sleeps become longer
13+
exponentialBackoff.sleep()
14+
self.assertAlmostEqual(mock_time(), 0.5)
15+
exponentialBackoff.sleep()
16+
self.assertAlmostEqual(mock_time(), 1.2)
17+
exponentialBackoff.sleep()
18+
self.assertAlmostEqual(mock_time(), 2.18)
19+
exponentialBackoff.sleep()
20+
self.assertAlmostEqual(mock_time(), 3.552)
21+
exponentialBackoff.sleep()
22+
self.assertAlmostEqual(mock_time(), 5.4728)
23+
24+
25+
def test_exponential_saturation(self):
26+
with mocked_time() as mock_time:
27+
exponentialBackoff = ExponentialBackoffTimer()
28+
for _ in range(99):
29+
exponentialBackoff.sleep()
30+
# We don't increase the sleep time above 30 seconds.
31+
# Otherwise, the exponential sleep time could easily
32+
# reach minutes or even hours between polls
33+
for _ in range(5):
34+
s = mock_time()
35+
exponentialBackoff.sleep()
36+
slept = mock_time() - s
37+
self.assertAlmostEqual(slept, 30)
38+
39+
40+
def test_timeout(self):
41+
with mocked_time() as mock_time:
42+
exponentialBackoff = ExponentialBackoffTimer(timeout=4.5)
43+
for _ in range(4):
44+
exponentialBackoff.sleep()
45+
self.assertAlmostEqual(mock_time(), 3.552)
46+
# Usually, the following sleep would sleep until 5.5, but due to
47+
# the timeout we wait less; thereby we make sure to take the timeout
48+
# into account as good as possible
49+
exponentialBackoff.sleep()
50+
self.assertAlmostEqual(mock_time(), 4.5)
51+
# The next call to `sleep` will raise a TimeoutError
52+
with self.assertRaises(TimeoutError):
53+
exponentialBackoff.sleep()
54+
55+
56+
def test_timeout_zero(self):
57+
with mocked_time() as mock_time:
58+
# The construction of the timer doesn't throw, yet
59+
exponentialBackoff = ExponentialBackoffTimer(timeout = 0)
60+
# But the first `sleep` immediately throws
61+
with self.assertRaises(TimeoutError):
62+
exponentialBackoff.sleep()

test/test_job.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
import requests_mock
55
import tableauserverclient as TSC
66
from tableauserverclient.datetime_helpers import utc
7-
from ._utils import read_xml_asset
7+
from tableauserverclient.server.endpoint.exceptions import JobFailedException
8+
from ._utils import read_xml_asset, mocked_time
89

910
TEST_ASSET_DIR = os.path.join(os.path.dirname(__file__), 'assets')
1011

1112
GET_XML = 'job_get.xml'
1213
GET_BY_ID_XML = 'job_get_by_id.xml'
14+
GET_BY_ID_FAILED_XML = 'job_get_by_id_failed.xml'
15+
GET_BY_ID_CANCELLED_XML = 'job_get_by_id_cancelled.xml'
16+
GET_BY_ID_INPROGRESS_XML = 'job_get_by_id_inprogress.xml'
1317

1418

1519
class JobTests(unittest.TestCase):
@@ -49,9 +53,6 @@ def test_get_by_id(self):
4953
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
5054
job = self.server.jobs.get_by_id(job_id)
5155

52-
created_at = datetime(2020, 5, 13, 20, 23, 45, tzinfo=utc)
53-
updated_at = datetime(2020, 5, 13, 20, 25, 18, tzinfo=utc)
54-
ended_at = datetime(2020, 5, 13, 20, 25, 18, tzinfo=utc)
5556
self.assertEqual(job_id, job.id)
5657
self.assertListEqual(job.notes, ['Job detail notes'])
5758

@@ -72,3 +73,35 @@ def test_cancel_item(self):
7273
with requests_mock.mock() as m:
7374
m.put(self.baseurl + '/ee8c6e70-43b6-11e6-af4f-f7b0d8e20760', status_code=204)
7475
self.server.jobs.cancel(job)
76+
77+
78+
def test_wait_for_job_finished(self):
79+
# Waiting for an already finished job, directly returns that job's info
80+
response_xml = read_xml_asset(GET_BY_ID_XML)
81+
job_id = '2eef4225-aa0c-41c4-8662-a76d89ed7336'
82+
with mocked_time(), requests_mock.mock() as m:
83+
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
84+
job = self.server.jobs.wait_for_job(job_id)
85+
86+
self.assertEqual(job_id, job.id)
87+
self.assertListEqual(job.notes, ['Job detail notes'])
88+
89+
90+
def test_wait_for_job_failed(self):
91+
# Waiting for a failed job raises an exception
92+
response_xml = read_xml_asset(GET_BY_ID_FAILED_XML)
93+
job_id = '77d5e57a-2517-479f-9a3c-a32025f2b64d'
94+
with mocked_time(), requests_mock.mock() as m:
95+
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
96+
with self.assertRaises(JobFailedException):
97+
self.server.jobs.wait_for_job(job_id)
98+
99+
100+
def test_wait_for_job_timeout(self):
101+
# Waiting for a job which doesn't terminate will throw an exception
102+
response_xml = read_xml_asset(GET_BY_ID_INPROGRESS_XML)
103+
job_id = '77d5e57a-2517-479f-9a3c-a32025f2b64d'
104+
with mocked_time(), requests_mock.mock() as m:
105+
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
106+
with self.assertRaises(TimeoutError):
107+
self.server.jobs.wait_for_job(job_id, timeout=30)

test/test_workbook.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ def test_publish_async(self):
616616
self.assertEqual('PublishWorkbook', new_job.type)
617617
self.assertEqual('0', new_job.progress)
618618
self.assertEqual('2018-06-29T23:22:32Z', format_datetime(new_job.created_at))
619-
self.assertEqual('1', new_job.finish_code)
619+
self.assertEqual(1, new_job.finish_code)
620620

621621
def test_publish_invalid_file(self):
622622
new_workbook = TSC.WorkbookItem('test', 'ee8c6e70-43b6-11e6-af4f-f7b0d8e20760')

0 commit comments

Comments
 (0)
0