From bdce9822ffbac122b5a7072497fe1e841084c012 Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Tue, 7 May 2024 21:41:32 -0700 Subject: [PATCH 1/9] Add Cloud Flow Task endpoint --- tableauserverclient/models/task_item.py | 1 + .../server/endpoint/flow_task_endpoint.py | 29 +++++++++ tableauserverclient/server/request_factory.py | 37 +++++++++++ tableauserverclient/server/server.py | 2 + test/test_flowtask.py | 61 +++++++++++++++++++ 5 files changed, 130 insertions(+) create mode 100644 tableauserverclient/server/endpoint/flow_task_endpoint.py create mode 100644 test/test_flowtask.py diff --git a/tableauserverclient/models/task_item.py b/tableauserverclient/models/task_item.py index 0ffc3bfab..01cfcfb11 100644 --- a/tableauserverclient/models/task_item.py +++ b/tableauserverclient/models/task_item.py @@ -18,6 +18,7 @@ class Type: _TASK_TYPE_MAPPING = { "RefreshExtractTask": Type.ExtractRefresh, "MaterializeViewsTask": Type.DataAcceleration, + "RunFlowTask": Type.RunFlow, } def __init__( diff --git a/tableauserverclient/server/endpoint/flow_task_endpoint.py b/tableauserverclient/server/endpoint/flow_task_endpoint.py new file mode 100644 index 000000000..1e53b22f1 --- /dev/null +++ b/tableauserverclient/server/endpoint/flow_task_endpoint.py @@ -0,0 +1,29 @@ +import logging +from typing import List, Optional, Tuple, TYPE_CHECKING + +from tableauserverclient.server.endpoint.endpoint import Endpoint, api +from tableauserverclient.server.endpoint.exceptions import MissingRequiredFieldError +from tableauserverclient.models import TaskItem, PaginationItem +from tableauserverclient.server import RequestFactory + +from tableauserverclient.helpers.logging import logger + +if TYPE_CHECKING: + from tableauserverclient.server.request_options import RequestOptions + + +class FlowTasks(Endpoint): + @property + def baseurl(self) -> str: + return "{0}/sites/{1}/tasks/flows".format(self.parent_srv.baseurl, self.parent_srv.site_id) + + @api(version="3.22") + def create(self, flow_item: TaskItem) -> TaskItem: + if not flow_item: + error = "No flow provided" + raise ValueError(error) + logger.info("Creating an flow task %s", flow_item) + url = self.baseurl + create_req = RequestFactory.Task.create_flow_task_req(flow_item) + server_response = self.post_request(url, create_req) + return server_response.content \ No newline at end of file diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index 1f6dfbfc6..904df1215 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -1113,6 +1113,43 @@ def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") return ET.tostring(xml_request) +class FlowTaskRequest(object): + @_tsrequest_wrapped + def run_req(self, xml_request, task_item): + # Send an empty tsRequest + pass + + @_tsrequest_wrapped + def create_flow_task_req(self, xml_request: ET.Element, flow_item: "TaskItem") -> bytes: + flow_element = ET.SubElement(xml_request, "runFlow") + + # Main attributes + flow_element.attrib["type"] = flow_item.task_type + + if flow_item.target is not None: + target_element = ET.SubElement(flow_element, flow_item.target.type) + target_element.attrib["id"] = flow_item.target.id + + if flow_item.schedule_item is None: + return ET.tostring(xml_request) + + # Schedule attributes + schedule_element = ET.SubElement(xml_request, "schedule") + + interval_item = flow_item.schedule_item.interval_item + schedule_element.attrib["frequency"] = interval_item._frequency + frequency_element = ET.SubElement(schedule_element, "frequencyDetails") + frequency_element.attrib["start"] = str(interval_item.start_time) + if hasattr(interval_item, "end_time") and interval_item.end_time is not None: + frequency_element.attrib["end"] = str(interval_item.end_time) + if hasattr(interval_item, "interval") and interval_item.interval: + intervals_element = ET.SubElement(frequency_element, "intervals") + for interval in interval_item._interval_type_pairs(): # type: ignore + expression, value = interval + single_interval_element = ET.SubElement(intervals_element, "interval") + single_interval_element.attrib[expression] = value + + return ET.tostring(xml_request) class SubscriptionRequest(object): @_tsrequest_wrapped diff --git a/tableauserverclient/server/server.py b/tableauserverclient/server/server.py index ee23789b1..3a6831458 100644 --- a/tableauserverclient/server/server.py +++ b/tableauserverclient/server/server.py @@ -25,6 +25,7 @@ Databases, Tables, Flows, + FlowTasks, Webhooks, DataAccelerationReport, Favorites, @@ -82,6 +83,7 @@ def __init__(self, server_address, use_server_version=False, http_options=None, self.datasources = Datasources(self) self.favorites = Favorites(self) self.flows = Flows(self) + self.flow_tasks = FlowTasks(self) self.projects = Projects(self) self.schedules = Schedules(self) self.server_info = ServerInfo(self) diff --git a/test/test_flowtask.py b/test/test_flowtask.py new file mode 100644 index 000000000..aaa4b0932 --- /dev/null +++ b/test/test_flowtask.py @@ -0,0 +1,61 @@ +import os +import unittest +from datetime import time +from pathlib import Path + +import requests_mock + +import tableauserverclient as TSC +from tableauserverclient.datetime_helpers import parse_datetime +from tableauserverclient.models.task_item import TaskItem + +TEST_ASSET_DIR = Path(__file__).parent / "assets" + +GET_XML_NO_WORKBOOK = os.path.join(TEST_ASSET_DIR, "tasks_no_workbook_or_datasource.xml") +GET_XML_WITH_WORKBOOK = os.path.join(TEST_ASSET_DIR, "tasks_with_workbook.xml") +GET_XML_WITH_DATASOURCE = os.path.join(TEST_ASSET_DIR, "tasks_with_datasource.xml") +GET_XML_RUN_NOW_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_run_now_response.xml") +GET_XML_CREATE_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_extract_task.xml") +GET_XML_WITHOUT_SCHEDULE = TEST_ASSET_DIR / "tasks_without_schedule.xml" +GET_XML_WITH_INTERVAL = TEST_ASSET_DIR / "tasks_with_interval.xml" + +GET_XML_CREATE_FLOW_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_flow_task.xml") + + + +class TaskTests(unittest.TestCase): + def setUp(self): + self.server = TSC.Server("http://test", False) + self.server.version = "3.22" + + # Fake Signin + self.server._site_id = "dad65087-b08b-4603-af4e-2887b8aafc67" + self.server._auth_token = "j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM" + + # default task type is extractRefreshes TODO change this + # self.baseurl = "{}/{}".format(self.server.tasks.baseurl, "extractRefreshes") + self.baseurl = self.server.flow_tasks.baseurl + + def test_create_flow_task(self): + monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15) + monthly_schedule = TSC.ScheduleItem( + None, + None, + None, + None, + monthly_interval, + ) + target_item = TSC.Target("flow_id", "flow") + + task = TaskItem(schedule_item=monthly_schedule, target=target_item) + # task = TaskItem(None, "FullRefresh", None, schedule_item=monthly_schedule, target=target_item) + + with open(GET_XML_CREATE_FLOW_TASK_RESPONSE, "rb") as f: + response_xml = f.read().decode("utf-8") + with requests_mock.mock() as m: + m.post("{}".format(self.baseurl), text=response_xml) + create_response_content = self.server.flow_tasks.create(task).decode("utf-8") + + self.assertTrue("task_id" in create_response_content) + self.assertTrue("flow_id" in create_response_content) + #self.assertTrue("FullRefresh" in create_response_content) From 67812858dd4ce43154d8ce9e22fbdc069875ffce Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 11:44:54 -0700 Subject: [PATCH 2/9] cleanup --- tableauserverclient/server/endpoint/__init__.py | 1 + tableauserverclient/server/endpoint/flow_task_endpoint.py | 2 +- tableauserverclient/server/request_factory.py | 1 + test/test_flowtask.py | 4 ---- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/tableauserverclient/server/endpoint/__init__.py b/tableauserverclient/server/endpoint/__init__.py index c018d8334..b2f291369 100644 --- a/tableauserverclient/server/endpoint/__init__.py +++ b/tableauserverclient/server/endpoint/__init__.py @@ -10,6 +10,7 @@ from .fileuploads_endpoint import Fileuploads from .flow_runs_endpoint import FlowRuns from .flows_endpoint import Flows +from .flow_task_endpoint import FlowTasks from .groups_endpoint import Groups from .jobs_endpoint import Jobs from .metadata_endpoint import Metadata diff --git a/tableauserverclient/server/endpoint/flow_task_endpoint.py b/tableauserverclient/server/endpoint/flow_task_endpoint.py index 1e53b22f1..18a9c2550 100644 --- a/tableauserverclient/server/endpoint/flow_task_endpoint.py +++ b/tableauserverclient/server/endpoint/flow_task_endpoint.py @@ -24,6 +24,6 @@ def create(self, flow_item: TaskItem) -> TaskItem: raise ValueError(error) logger.info("Creating an flow task %s", flow_item) url = self.baseurl - create_req = RequestFactory.Task.create_flow_task_req(flow_item) + create_req = RequestFactory.FlowTask.create_flow_task_req(flow_item) server_response = self.post_request(url, create_req) return server_response.content \ No newline at end of file diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index 904df1215..825451187 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -1290,6 +1290,7 @@ class RequestFactory(object): Favorite = FavoriteRequest() Fileupload = FileuploadRequest() Flow = FlowRequest() + FlowTask = FlowTaskRequest() Group = GroupRequest() Metric = MetricRequest() Permission = PermissionRequest() diff --git a/test/test_flowtask.py b/test/test_flowtask.py index aaa4b0932..8588d5701 100644 --- a/test/test_flowtask.py +++ b/test/test_flowtask.py @@ -32,8 +32,6 @@ def setUp(self): self.server._site_id = "dad65087-b08b-4603-af4e-2887b8aafc67" self.server._auth_token = "j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM" - # default task type is extractRefreshes TODO change this - # self.baseurl = "{}/{}".format(self.server.tasks.baseurl, "extractRefreshes") self.baseurl = self.server.flow_tasks.baseurl def test_create_flow_task(self): @@ -48,7 +46,6 @@ def test_create_flow_task(self): target_item = TSC.Target("flow_id", "flow") task = TaskItem(schedule_item=monthly_schedule, target=target_item) - # task = TaskItem(None, "FullRefresh", None, schedule_item=monthly_schedule, target=target_item) with open(GET_XML_CREATE_FLOW_TASK_RESPONSE, "rb") as f: response_xml = f.read().decode("utf-8") @@ -58,4 +55,3 @@ def test_create_flow_task(self): self.assertTrue("task_id" in create_response_content) self.assertTrue("flow_id" in create_response_content) - #self.assertTrue("FullRefresh" in create_response_content) From 06b76d6dbce43cecb1b872d265c764b614d4fad7 Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 14:12:53 -0700 Subject: [PATCH 3/9] black format --- tableauserverclient/server/endpoint/flow_task_endpoint.py | 2 +- tableauserverclient/server/request_factory.py | 8 +++++--- test/test_flowtask.py | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tableauserverclient/server/endpoint/flow_task_endpoint.py b/tableauserverclient/server/endpoint/flow_task_endpoint.py index 18a9c2550..eea3f9710 100644 --- a/tableauserverclient/server/endpoint/flow_task_endpoint.py +++ b/tableauserverclient/server/endpoint/flow_task_endpoint.py @@ -26,4 +26,4 @@ def create(self, flow_item: TaskItem) -> TaskItem: url = self.baseurl create_req = RequestFactory.FlowTask.create_flow_task_req(flow_item) server_response = self.post_request(url, create_req) - return server_response.content \ No newline at end of file + return server_response.content diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index 825451187..cca4b82a6 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -972,9 +972,9 @@ def update_req(self, workbook_item): if data_freshness_policy_config.option == "FreshEvery": if data_freshness_policy_config.fresh_every_schedule is not None: fresh_every_element = ET.SubElement(data_freshness_policy_element, "freshEverySchedule") - fresh_every_element.attrib[ - "frequency" - ] = data_freshness_policy_config.fresh_every_schedule.frequency + fresh_every_element.attrib["frequency"] = ( + data_freshness_policy_config.fresh_every_schedule.frequency + ) fresh_every_element.attrib["value"] = str(data_freshness_policy_config.fresh_every_schedule.value) else: raise ValueError(f"data_freshness_policy_config.fresh_every_schedule must be populated.") @@ -1113,6 +1113,7 @@ def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") return ET.tostring(xml_request) + class FlowTaskRequest(object): @_tsrequest_wrapped def run_req(self, xml_request, task_item): @@ -1151,6 +1152,7 @@ def create_flow_task_req(self, xml_request: ET.Element, flow_item: "TaskItem") - return ET.tostring(xml_request) + class SubscriptionRequest(object): @_tsrequest_wrapped def create_req(self, xml_request: ET.Element, subscription_item: "SubscriptionItem") -> bytes: diff --git a/test/test_flowtask.py b/test/test_flowtask.py index 8588d5701..61a09b429 100644 --- a/test/test_flowtask.py +++ b/test/test_flowtask.py @@ -22,7 +22,6 @@ GET_XML_CREATE_FLOW_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_flow_task.xml") - class TaskTests(unittest.TestCase): def setUp(self): self.server = TSC.Server("http://test", False) From 4735bd31185c6dec8b1fdccce86ee8aa32f129dd Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 14:29:26 -0700 Subject: [PATCH 4/9] add xml --- test/assets/tasks_create_flow_task.xml | 14 ++++++++++++++ test/test_flowtask.py | 9 --------- 2 files changed, 14 insertions(+), 9 deletions(-) create mode 100644 test/assets/tasks_create_flow_task.xml diff --git a/test/assets/tasks_create_flow_task.xml b/test/assets/tasks_create_flow_task.xml new file mode 100644 index 000000000..44826a94a --- /dev/null +++ b/test/assets/tasks_create_flow_task.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/test_flowtask.py b/test/test_flowtask.py index 61a09b429..1f7d82c30 100644 --- a/test/test_flowtask.py +++ b/test/test_flowtask.py @@ -10,15 +10,6 @@ from tableauserverclient.models.task_item import TaskItem TEST_ASSET_DIR = Path(__file__).parent / "assets" - -GET_XML_NO_WORKBOOK = os.path.join(TEST_ASSET_DIR, "tasks_no_workbook_or_datasource.xml") -GET_XML_WITH_WORKBOOK = os.path.join(TEST_ASSET_DIR, "tasks_with_workbook.xml") -GET_XML_WITH_DATASOURCE = os.path.join(TEST_ASSET_DIR, "tasks_with_datasource.xml") -GET_XML_RUN_NOW_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_run_now_response.xml") -GET_XML_CREATE_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_extract_task.xml") -GET_XML_WITHOUT_SCHEDULE = TEST_ASSET_DIR / "tasks_without_schedule.xml" -GET_XML_WITH_INTERVAL = TEST_ASSET_DIR / "tasks_with_interval.xml" - GET_XML_CREATE_FLOW_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_flow_task.xml") From d6fd8291378d2393a02a8dc96cd46853d2455515 Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 15:17:40 -0700 Subject: [PATCH 5/9] edit test initialization --- test/assets/tasks_create_flow_task.xml | 38 ++++++++++++++++++-------- test/test_flowtask.py | 8 +++--- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/test/assets/tasks_create_flow_task.xml b/test/assets/tasks_create_flow_task.xml index 44826a94a..b5a6aa6f4 100644 --- a/test/assets/tasks_create_flow_task.xml +++ b/test/assets/tasks_create_flow_task.xml @@ -1,14 +1,28 @@ - - - - - - - - - - - - + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/test_flowtask.py b/test/test_flowtask.py index 1f7d82c30..ed2627147 100644 --- a/test/test_flowtask.py +++ b/test/test_flowtask.py @@ -27,10 +27,10 @@ def setUp(self): def test_create_flow_task(self): monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15) monthly_schedule = TSC.ScheduleItem( - None, - None, - None, - None, + "Monthly Schedule", + 50, + TSC.ScheduleItem.Type.Flow, + TSC.ScheduleItem.ExecutionOrder.Parallel, monthly_interval, ) target_item = TSC.Target("flow_id", "flow") From 7f11a6d4ff7d4da1d526784d30ef30182f9592aa Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 15:31:14 -0700 Subject: [PATCH 6/9] fix task initialization --- test/test_flowtask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_flowtask.py b/test/test_flowtask.py index ed2627147..dd2d07eef 100644 --- a/test/test_flowtask.py +++ b/test/test_flowtask.py @@ -35,7 +35,7 @@ def test_create_flow_task(self): ) target_item = TSC.Target("flow_id", "flow") - task = TaskItem(schedule_item=monthly_schedule, target=target_item) + task = TaskItem(None, "RunFlow", None, schedule_item=monthly_schedule, target=target_item) with open(GET_XML_CREATE_FLOW_TASK_RESPONSE, "rb") as f: response_xml = f.read().decode("utf-8") From c746957b3293f1fedc46af86f07432d86bc803b5 Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 15:45:12 -0700 Subject: [PATCH 7/9] third times the charm --- test/assets/tasks_create_flow_task.xml | 12 ++++++------ test/test_flowtask.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/test/assets/tasks_create_flow_task.xml b/test/assets/tasks_create_flow_task.xml index b5a6aa6f4..11c9a4ff0 100644 --- a/test/assets/tasks_create_flow_task.xml +++ b/test/assets/tasks_create_flow_task.xml @@ -1,11 +1,11 @@ - - - - + - diff --git a/test/test_flowtask.py b/test/test_flowtask.py index dd2d07eef..034066e64 100644 --- a/test/test_flowtask.py +++ b/test/test_flowtask.py @@ -43,5 +43,5 @@ def test_create_flow_task(self): m.post("{}".format(self.baseurl), text=response_xml) create_response_content = self.server.flow_tasks.create(task).decode("utf-8") - self.assertTrue("task_id" in create_response_content) + self.assertTrue("schedule_id" in create_response_content) self.assertTrue("flow_id" in create_response_content) From 0e5ce785d601a3c013c97a305188d281a867c866 Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Wed, 8 May 2024 15:51:58 -0700 Subject: [PATCH 8/9] cleanup --- tableauserverclient/server/request_factory.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index cca4b82a6..61507ea2e 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -1115,11 +1115,6 @@ def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") class FlowTaskRequest(object): - @_tsrequest_wrapped - def run_req(self, xml_request, task_item): - # Send an empty tsRequest - pass - @_tsrequest_wrapped def create_flow_task_req(self, xml_request: ET.Element, flow_item: "TaskItem") -> bytes: flow_element = ET.SubElement(xml_request, "runFlow") From bcb02ac5e294246e07859ddc1281bba11b58ee09 Mon Sep 17 00:00:00 2001 From: "liu.r" Date: Thu, 9 May 2024 17:33:27 -0700 Subject: [PATCH 9/9] fix formatting --- tableauserverclient/server/request_factory.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index 61507ea2e..c204e7217 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -972,9 +972,9 @@ def update_req(self, workbook_item): if data_freshness_policy_config.option == "FreshEvery": if data_freshness_policy_config.fresh_every_schedule is not None: fresh_every_element = ET.SubElement(data_freshness_policy_element, "freshEverySchedule") - fresh_every_element.attrib["frequency"] = ( - data_freshness_policy_config.fresh_every_schedule.frequency - ) + fresh_every_element.attrib[ + "frequency" + ] = data_freshness_policy_config.fresh_every_schedule.frequency fresh_every_element.attrib["value"] = str(data_freshness_policy_config.fresh_every_schedule.value) else: raise ValueError(f"data_freshness_policy_config.fresh_every_schedule must be populated.")