10000 Merge pull request #1371 from liu-rebecca/add-create-cloud-flow-task · tableau/server-client-python@1e1f21c · GitHub
[go: up one dir, main page]

Skip to content

Commit 1e1f21c

Browse files
authored
Merge pull request #1371 from liu-rebecca/add-create-cloud-flow-task
@W-14332894 - Add create cloud flow task support
2 parents eaedc29 + bcb02ac commit 1e1f21c

File tree

7 files changed

+143
-0
lines changed

7 files changed

+143
-0
lines changed

tableauserverclient/models/task_item.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class Type:
1818
_TASK_TYPE_MAPPING = {
1919
"RefreshExtractTask": Type.ExtractRefresh,
2020
"MaterializeViewsTask": Type.DataAcceleration,
21+
"RunFlowTask": Type.RunFlow,
2122
}
2223

2324
def __init__(

tableauserverclient/server/endpoint/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .fileuploads_endpoint import Fileuploads
1111
from .flow_runs_endpoint import FlowRuns
1212
from .flows_endpoint import Flows
13+
from .flow_task_endpoint import FlowTasks
1314
from .groups_endpoint import Groups
1415
from .jobs_endpoint import Jobs
1516
from .metadata_endpoint import Metadata
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
from typing import List, Optional, Tuple, TYPE_CHECKING
3+
4+
from tableauserverclient.server.endpoint.endpoint import Endpoint, api
5+
from tableauserverclient.server.endpoint.exceptions import MissingRequiredFieldError
6+
from tableauserverclient.models import TaskItem, PaginationItem
7+
from tableauserverclient.server import RequestFactory
8+
9+
from tableauserverclient.helpers.logging import logger
10+
11+
if TYPE_CHECKING:
12+
from tableauserverclient.server.request_options import RequestOptions
13+
14+
15+
class FlowTasks(Endpoint):
16+
@property
17+
def baseurl(self) -> str:
18+
return "{0}/sites/{1}/tasks/flows".format(self.parent_srv.baseurl, self.parent_srv.site_id)
19+
20+
@api(version="3.22")
21+
def create(self, flow_item: TaskItem) -> TaskItem:
22+
if not flow_item:
23+
error = "No flow provided"
24+
raise ValueError(error)
25+
logger.info("Creating an flow task %s", flow_item)
26+
url = self.baseurl
27+
create_req = RequestFactory.FlowTask.create_flow_task_req(flow_item)
28+
server_response = self.post_request(url, create_req)
29+
return server_response.content

tableauserverclient/server/request_factory.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,40 @@ def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem")
11141114
return ET.tostring(xml_request)
11151115

11161116

1117+
class FlowTaskRequest(object):
1118+
@_tsrequest_wrapped
1119+
def create_flow_task_req(self, xml_request: ET.Element, flow_item: "TaskItem") -> bytes:
1120+
flow_element = ET.SubElement(xml_request, "runFlow")
1121+
1122+
# Main attributes
1123+
flow_element.attrib["type"] = flow_item.task_type
1124+
1125+
if flow_item.target is not None:
1126+
target_element = ET.SubElement(flow_element, flow_item.target.type)
1127+
target_element.attrib["id"] = flow_item.target.id
1128+
1129+
if flow_item.schedule_item is None:
1130+
return ET.tostring(xml_request)
1131+
1132+
# Schedule attributes
1133+
schedule_element = ET.SubElement(xml_request, "schedule")
1134+
1135+
interval_item = flow_item.schedule_item.interval_item
1136+
schedule_element.attrib["frequency"] = interval_item._frequency
1137+
frequency_element = ET.SubElement(schedule_element, "frequencyDetails")
1138+
frequency_element.attrib["start"] = str(interval_item.start_time)
1139+
if hasattr(interval_item, "end_time") and interval_item.end_time is not None:
1140+
frequency_element.attrib["end"] = str(interval_item.end_time)
1141+
if hasattr(interval_item, "interval") and interval_item.interval:
1142+
intervals_element = ET.SubElement(frequency_element, "intervals")
1143+
for interval in interval_item._interval_type_pairs(): # type: ignore
1144+
expression, value = interval
1145+
single_interval_element = ET.SubElement(intervals_element, "interval")
1146+
single_interval_element.attrib[expression] = value
1147+
1148+
return ET.tostring(xml_request)
1149+
1150+
11171151
class SubscriptionRequest(object):
11181152
@_tsrequest_wrapped
11191153
def create_req(self, xml_request: ET.Element, subscription_item: "SubscriptionItem") -> bytes:
@@ -1253,6 +1287,7 @@ class RequestFactory(object):
12531287
Favorite = FavoriteRequest()
12541288
Fileupload = FileuploadRequest()
12551289
Flow = FlowRequest()
1290+
FlowTask = FlowTaskRequest()
12561291
Group = GroupRequest()
12571292
Metric = MetricRequest()
12581293
Permission = PermissionRequest()

tableauserverclient/server/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
Databases,
2626
Tables,
2727
Flows,
28+
FlowTasks,
2829
Webhooks,
2930
DataAccelerationReport,
3031
Favorites,
@@ -82,6 +83,7 @@ def __init__(self, server_address, use_server_version=False, http_options=None,
8283
self.datasources = Datasources(self)
8384
self.favorites = Favorites(self)
8485
self.flows = Flows(self)
86+
self.flow_tasks = FlowTasks(self)
8587
self.projects = Projects(self)
8688
self.schedules = Schedules(self)
8789
self.server_info = ServerInfo(self)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api https://help.tableau.com/samples/en-us/rest_api/ts-api_3_22.xsd">
2+
<task>
3+
<flowRun id="flow_run_id"
4+
priority="50"
5+
consecutiveFailedCount="0"
6+
type="RunFlowTask">
7+
<schedule id="schedule_id"
8+
name="schedu;e_name"
9+
state="Active"
10+
priority="50"
11+
createdAt="2024-04-09T18:54:12Z"
12+
updatedAt="2024-04-09T18:54:12Z"
13+
type="Flow"
14+
frequency="Daily"
15+
nextRunAt="2024-04-10T19:30:00Z"/>
16+
<flow id="flow_id"
17+
name="olympic 1">
18+
</flow>
19+
<flowRunSpec flowId="flow_id">
20+
<flowOutputSteps>
21+
<flowOutputStep id="flow_output_setp_id"
22+
name="Output"/>
23+
</flowOutputSteps>
24+
<flowParameterSpecs/>
25+
</flowRunSpec>
26+
</flowRun>
27+
</task>
28+
</tsResponse>

test/test_flowtask.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import os
2+
import unittest
3+
from datetime import time
4+
from pathlib import Path
5+
6+
import requests_mock
7+
8+
import tableauserverclient as TSC
9+
from tableauserverclient.datetime_helpers import parse_datetime
10+
from tableauserverclient.models.task_item import TaskItem
11+
12+
TEST_ASSET_DIR = Path(__file__).parent / "assets"
13+
GET_XML_CREATE_FLOW_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_flow_task.xml")
14+
15+
16+
class TaskTests(unittest.TestCase):
17+
def setUp(self):
18+
self.server = TSC.Server("http://test", False)
19+
self.server.version = "3.22"
20+
21+
# Fake Signin
22+
self.server._site_id = "dad65087-b08b-4603-af4e-2887b8aafc67"
23+
self.server._auth_token = "j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM"
24+
25+
self.baseurl = self.server.flow_tasks.baseurl
26+
27+
def test_create_flow_task(self):
28+
monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15)
29+
monthly_schedule = TSC.ScheduleItem(
30+
"Monthly Schedule",
31+
50,
32+
TSC.ScheduleItem.Type.Flow,
33+
TSC.ScheduleItem.ExecutionOrder.Parallel,
34+
monthly_interval,
35+
)
36+
target_item = TSC.Target("flow_id", "flow")
37+
38+
task = TaskItem(None, "RunFlow", None, schedule_item=monthly_schedule, target=target_item)
39+
40+
with open(GET_XML_CREATE_FLOW_TASK_RESPONSE, "rb") as f:
41+
response_xml = f.read().decode("utf-8")
42+
with requests_mock.mock() as m:
43+
m.post("{}".format(self.baseurl), text=response_xml)
44+
create_response_content = self.server.flow_tasks.create(task).decode("utf-8")
45+
46+
self.assertTrue("schedule_id" in create_response_content)
47+
self.assertTrue("flow_id" in create_response_content)

0 commit comments

Comments
 (0)
0