8000 Add support for scheduling Data Update jobs (#891) · ramdesh/server-client-python@7c03396 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7c03396

Browse files
authored
Add support for scheduling Data Update jobs (tableau#891)
This commit adds support for the `datasources/<id>/data` endpoint through which one can schedule jobs to update the data within a published live-to-Hyper datasource on the server. The new `datasources.update_data` expects the arguments: * a datasource or a connection: If the datasource only contains a single connections, the datasource is sufficient to identify which Hyper file should be updated. Otherwise, for datasources with multiple connections, the connections has to be provided. This distinction happens on the server, so the client library only needs to provide a way to specify either of both. * a `request_id` which will be used to ensure idempotency on the server. This parameter is simply passed as a HTTP header . * an `actions` list, specifying how exactly the data on the server should be modified. We expect the caller to provide list following the structure documented in the REST API documentation. TSC does not validate this object and simply passes it through to the server. * an optional `payload` file: For actions like `insert`, one can provide a Hyper file which contains the newly inserted tuples or other payload data. TSC will upload this file to the server and then hand it over to the update-API endpoint. Besides the addition of the `datasources.update_data` itself, this commit also adds some infrastructure changes, e.g., to enable sending PATCH requests and HTTP headers.
1 parent 5e38225 commit 7c03396

File tree

5 files changed

+209
-2
lines changed

5 files changed

+209
-2
lines changed

samples/update_datasource_data.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
####
2+
# This script demonstrates how to update the data within a published
3+
# live-to-Hyper datasource on server.
4+
#
5+
# The sample is hardcoded against the `World Indicators` dataset and
6+
# expects to receive the LUID of a published datasource containing
7+
# that data. To create such a published datasource, you can use:
8+
# ./publish_datasource.py --file ../test/assets/World\ Indicators.hyper
9+
# which will print you the LUID of the datasource.
10+
#
11+
# Before running this script, the datasource will contain a region `Europe`.
12+
# After running this script, that region will be gone.
13+
#
14+
####
15+
16+
import argparse
17+
import uuid
18+
import logging
19+
20+
import tableauserverclient as TSC
21+
22+
23+
def main():
24+
parser = argparse.ArgumentParser(description='Delete the `Europe` region from a published `World Indicators` datasource.')
25+
# Common options; please keep those in sync across all samples
26+
parser.add_argument('--server', '-s', required=True, help='server address')
27+
parser.add_argument('--site', '-S', help='site name')
28+
parser.add_argument('--token-name', '-p', required=True,
29+
help='name of the personal access token used to sign into the server')
30+
parser.add_argument('--token-value', '-v', required=True,
31+
help='value of the personal access token used to sign into the server')
32+
parser.add_argument('--logging-level', '-l', choices=['debug', 'info', 'error'], default='error',
33+
help='desired logging level (set to error by default)')
34+
# Options specific to this sample
35+
parser.add_argument('datasource_id', help="The LUID of the `World Indicators` datasource")
36+
37+
args = parser.parse_args()
38+
39+
# Set logging level based on user input, or error by default
40+
logging_level = getattr(logging, args.logging_level.upper())
41+
logging.basicConfig(level=logging_level)
42+
43+
tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site)
44+
server = TSC.Server(args.server, use_server_version=True)
45+
with server.auth.sign_in(tableau_auth):
46+
# We use a unique `request_id` for every request.
47+
# In case the submission of the update job fails, we won't know wether the job was submitted
48+
# or not. It could be that the server received the request, changed the data, but then the
49+
# network connection broke down.
50+
# If you want to have a way to retry, e.g., inserts while making sure they aren't duplicated,
51+
# you need to use `request_id` for that purpose.
52+
# In our case, we don't care about retries. And the delete is idempotent anyway.
53+
# Hence, we simply use a randomly generated request id.
54+
request_id = str(uuid.uuid4())
55+
56+
# This action will delete all rows with `Region=Europe` from the published data source.
57+
# Other actions (inserts, updates, ...) are also available. For more information see
58+
# https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_how_to_update_data_to_hyper.htm
59+
actions = [
60+
{
61+
"action": "delete",
62+
"target-table": "Extract",
63+
"target-schema": "Extract",
64+
"condition": {"op": "eq", "target-col": "Region", "const": {"type": "string", "v": "Europe"}}
65+
}
66+
]
67+
68+
job = server.datasources.update_data(args.datasource_id, request_id=request_id, actions=actions)
69+
70+
# TODO: Add a flag that will poll and wait for the returned job to be done
71+
print(job)
72+
73+
if __name__ == '__main__':
74+
main()

tableauserverclient/server/endpoint/datasources_endpoint.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import copy
1919
import cgi
2020
from contextlib import closing
21+
import json
2122

2223
# The maximum size of a file that can be published in a single request is 64MB
2324
FILESIZE_LIMIT = 1024 * 1024 * 64 # 64MB
@@ -282,6 +283,34 @@ def publish(
282283
logger.info("Published {0} (ID: {1})".format(filename, new_datasource.id))
283284
return new_datasource
284285

286+
@api(version="3.13")
287+
def update_data(self, datasource_or_connection_item, *, request_id, actions, payload = None):
288+
if isinstance(datasource_or_connection_item, DatasourceItem):
289+
datasource_id = datasource_or_connection_item.id
290+
url = "{0}/{1}/data".format(self.baseurl, datasource_id)
291+
elif isinstance(datasource_or_connection_item, ConnectionItem):
292+
datasource_id = datasource_or_connection_item.datasource_id
293+
connection_id = datasource_or_connection_item.id
294+
url = "{0}/{1}/connections/{2}/data".format(self.baseurl, datasource_id, connection_id)
295+
else:
296+
assert isinstance(datasource_or_connection_item, str)
297+
url = "{0}/{1}/data".format(self.baseurl, datasource_or_connection_item)
298+
299+
if payload is not None:
300+
if not os.path.isfile(payload):
301+
error = "File path does not lead to an existing file."
302+
raise IOError(error)
303+
304+
logger.info("Uploading {0} to server with chunking method for Update job".format(payload))
305+
upload_session_id = self.parent_srv.fileuploads.upload(payload)
306+
url = "{0}?uploadSessionId={1}".format(url, upload_session_id)
307+
308+
json_request = json.dumps({"actions": actions})
309+
parameters = {"headers": {"requestid": request_id}}
310+
server_response = self.patch_request(url, json_request, "application/json", parameters=parameters)
311+
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
312+
return new_job
313+
285314
@api(version="2.0")
286315
def populate_permissions(self, item):
287316
self._permissions.populate(item)

tableauserverclient/server/endpoint/endpoint.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ def _make_request(
5555
):
5656
parameters = parameters or {}
5757
parameters.update(self.parent_srv.http_options)
58-
parameters["headers"] = Endpoint._make_common_headers(auth_token, content_type)
58+
if not "headers" in parameters:
59+
parameters["headers"] = {}
60+
parameters["headers"].update(Endpoint._make_common_headers(auth_token, content_type))
5961

6062
if content is not None:
6163
parameters["data"] = content
@@ -118,13 +120,14 @@ def delete_request(self, url):
118120
# We don't return anything for a delete
119121
self._make_request(self.parent_srv.session.delete, url, auth_token=self.parent_srv.auth_token)
120122

121-
def put_request(self, url, xml_request=None, content_type="text/xml"):
123+
def put_request(self, url, xml_request=None, content_type="text/xml", parameters=None):
122124
return self._make_request(
123125
self.parent_srv.session.put,
124126
url,
125127
content=xml_request,
126128
auth_token=self.parent_srv.auth_token,
127129
content_type=content_type,
130+
parameters=parameters,
128131
)
129132

130133
def post_request(self, url, xml_request, content_type="text/xml", parameters=None):
@@ -137,6 +140,16 @@ def post_request(self, url, xml_request, content_type="text/xml", parameters=Non
137140
parameters=parameters,
138141
)
139142

143+
def patch_request(self, url, xml_request, content_type="text/xml", parameters=None):
144+
return self._make_request(
145+
self.parent_srv.session.patch,
146+
url,
147+
content=xml_request,
148+
auth_token=self.parent_srv.auth_token,
149+
content_type=content_type,
150+
parameters=parameters,
151+
)
152+
140153

141154
def api(version):
142155
"""Annotate the minimum supported version for an endpoint.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?xml version='1.0' encoding='UTF-8'?>
2+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api https://help.tableau.com/samples/en-us/rest_api/ts-api_3_14.xsd">
3+
<job id="5c0ba560-c959-424e-b08a-f32ef0bfb737" mode="Asynchronous" type="UpdateUploadedFile" createdAt="2021-09-18T09:40:12Z">
4+
<updateUploadedFileJob>
5+
<datasource id="9dbd2263-16b5-46e1-9c43-a76bb8ab65fb" name="test datasource"/>
6+
<connectionLuid>7ecaccd8-39b0-4875-a77d-094f6e930019</connectionLuid>
7+
</updateUploadedFileJob>
8+
</job>
9+
</tsResponse>

test/test_datasource.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from tableauserverclient.server.endpoint.fileuploads_endpoint import Fileuploads
12
import unittest
23
from io import BytesIO
34
import os
@@ -22,6 +23,7 @@
2223
PUBLISH_XML_ASYNC = 'datasource_publish_async.xml'
2324
REFRESH_XML = 'datasource_refresh.xml'
2425
UPDATE_XML = 'datasource_update.xml'
26+
UPDATE_DATA_XML = 'datasource_data_update.xml'
2527
UPDATE_CONNECTION_XML = 'datasource_connection_update.xml'
2628

2729

@@ -355,6 +357,86 @@ def test_refresh_object(self):
355357
# We only check the `id`; remaining fields are already tested in `test_refresh_id`
356358
self.assertEqual('7c3d599e-949f-44c3-94a1-f30ba85757e4', new_job.id)
357359

360+
def test_update_data_datasource_object(self):
361+
"""Calling `update_data` with a `DatasourceItem` should update that datasource"""
362+
self.server.version = "3.13"
363+
self.baseurl = self.server.datasources.baseurl
364+
365+
datasource = TSC.DatasourceItem('')
366+
datasource._id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb'
367+
response_xml = read_xml_asset(UPDATE_DATA_XML)
368+
with requests_mock.mock() as m:
369+
m.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/data',
370+
status_code=202, headers={"requestid": "test_id"}, text=response_xml)
371+
new_job = self.server.datasources.update_data(datasource, request_id="test_id", actions=[])
372+
373+
self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id)
374+
self.assertEqual('UpdateUploadedFile', new_job.type)
375+
self.assertEqual(None, new_job.progress)
376+
self.assertEqual('2021-09-18T09:40:12Z', format_datetime(new_job.created_at))
377+
self.assertEqual(-1, new_job.finish_code)
378+
379+
def test_update_data_connection_object(self):
380+
"""Calling `update_data` with a `ConnectionItem` should update that connection"""
381+
self.server.version = "3.13"
382+
self.baseurl = self.server.datasources.baseurl
383+
384+
connection = TSC.ConnectionItem()
385+
connection._datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb'
386+
connection._id = '7ecaccd8-39b0-4875-a77d-094f6e930019'
387+
response_xml = read_xml_asset(UPDATE_DATA_XML)
388+
with requests_mock.mock() as m:
389+
m.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/connections/7ecaccd8-39b0-4875-a77d-094f6e930019/data',
390+
status_code=202, headers={"requestid": "test_id"}, text=response_xml)
391+
new_job = self.server.datasources.update_data(connection, request_id="test_id", actions=[])
392+
393+
# We only check the `id`; remaining fields are already tested in `test_update_data_datasource_object`
394+
self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id)
395+
396+
def test_update_data_datasource_string(self):
397+
"""For convenience, calling `update_data` with a `str` should update the datasource with the corresponding UUID"""
398+
self.server.version = "3.13"
399+
self.baseurl = self.server.datasources.baseurl
400+
401+
datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb'
402+
response_xml = read_xml_asset(UPDATE_DATA_XML)
403+
with requests_mock.mock() as m:
404+
m.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/data',
405+
status_code=202, headers={"requestid": "test_id"}, text=response_xml)
406+
new_job = self.server.datasources.update_data(datasource_id, request_id="test_id", actions=[])
407+
408+
# We only check the `id`; remaining fields are already tested in `test_update_data_datasource_object`
409+
self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id)
410+
411+
def test_update_data_datasource_payload_file(self):
412+
"""If `payload` is present, we upload it and associate the job with it"""
413+
self.server.version = "3.13"
414+
self.baseurl = self.server.datasources.baseurl
415+
416+
datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb'
417+
mock_upload_id = '10051:c3e56879876842d4b3600f20c1f79876-0:0'
418+
response_xml = read_xml_asset(UPDATE_DATA_XML)
419+
with requests_mock.mock() as rm, \
420+
unittest.mock.patch.object(Fileuploads, "upload", return_value=mock_upload_id):
421+
rm.patch(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb/data?uploadSessionId=' + mock_upload_id,
422+
status_code=202, headers={"requestid": "test_id"}, text=response_xml)
423+
new_job = self.server.datasources.update_data(datasource_id, request_id="test_id",
424+
actions=[], payload=asset('World Indicators.hyper'))
425+
426+
# We only check the `id`; remaining fields are already tested in `test_update_data_datasource_object`
427+
self.assertEqual('5c0ba560-c959-424e-b08a-f32ef0bfb737', new_job.id)
428+
429+
def test_update_data_datasource_invalid_payload_file(self):
430+
"""If `payload` points to a non-existing file, we report an error"""
431+
self.server.version = "3.13"
432+
self.baseurl = self.server.datasources.baseurl
433+
datasource_id = '9dbd2263-16b5-46e1-9c43-a76bb8ab65fb'
434+
with self.assertRaises(IOError) as cm:
435+
self.server.datasources.update_data(datasource_id, request_id="test_id",
436+
actions=[], payload='no/such/file.missing')
437+
exception = cm.exception
438+
self.assertEqual(str(exception), "File path does not lead to an existing file.")
439+
358440
def test_delete(self):
359441
with requests_mock.mock() as m:
360442
m.delete(self.baseurl + '/9dbd2263-16b5-46e1-9c43-a76bb8ab65fb', status_code=204)

0 commit comments

Comments
 (0)
0