8000 Add FlowRun Item and Endpoints. by jorwoods · Pull Request #884 · tableau/server-client-python · GitHub
[go: up one dir, main page]

Skip to content

Add FlowRun Item and Endpoints. #884

8000
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.5, 3.6, 3.7, 3.8, 3.9, 3.10.0-rc.2]
python-version: [3.6, 3.7, 3.8, 3.9, 3.10.0-rc.2]

runs-on: ${{ matrix.os }}

Expand Down
1 change: 1 addition & 0 deletions tableauserverclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
FlowItem,
WebhookItem,
PersonalAccessTokenAuth,
FlowRunItem
)
from .server import (
RequestOptions,
Expand Down
1 change: 1 addition & 0 deletions tableauserverclient/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .favorites_item import FavoriteItem
from .group_item import GroupItem
from .flow_item import FlowItem
from .flow_run_item import FlowRunItem
from .interval_item import (
IntervalItem,
DailyInterval,
Expand Down
106 changes: 106 additions & 0 deletions tableauserverclient/models/flow_run_item.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import xml.etree.ElementTree as ET
from ..datetime_helpers import parse_datetime
import itertools


class FlowRunItem(object):
def __init__(self) -> None:
self._id=None
self._flow_id=None
self._status=None
self._started_at=None
self._completed_at=None
self._progress=None
self._background_job_id=None


@property
def id(self):
return self._id


@property
def flow_id(self):
return self._flow_id


@property
def status(self):
return self._status


@property
def started_at(self):
return self._started_at


@property
def completed_at(self):
return self._completed_at


@property
def progress(self):
return self._progress


@property
def background_job_id(self):
return self._background_job_id


def _set_values(
self,
id,
flow_id,
status,
started_at,
completed_at,
progress,
background_job_id,
):
if id is not None:
self._id = id
if flow_id is not None:
self._flow_id = flow_id
if status is not None:
self._status = status
if started_at is not None:
self._started_at = started_at
if completed_at is not None:
self._completed_at = completed_at
if progress is not None:
self._progress = progress
if background_job_id is not None:
self._background_job_id = background_job_id


@classmethod
def from_response(cls, resp, ns):
all_flowrun_items = list()
parsed_response = ET.fromstring(resp)
all_flowrun_xml = itertools.chain(
parsed_response.findall(".//t:flowRun[@id]", namespaces=ns),
parsed_response.findall(".//t:flowRuns[@id]", namespaces=ns)
)

for flowrun_xml in all_flowrun_xml:
parsed = cls._parse_element(flowrun_xml, ns)
flowrun_item = cls()
flowrun_item._set_values(**parsed)
all_flowrun_items.append(flowrun_item)
return all_flowrun_items


@staticmethod
def _parse_element(flowrun_xml, ns):
result = {}
result['id'] = flowrun_xml.get("id", None)
result['flow_id'] = flowrun_xml.get("flowId", None)
result['status'] = flowrun_xml.get("status", None)
result['started_at'] = parse_datetime(flowrun_xml.get("startedAt", None))
result['completed_at'] = parse_datetime(flowrun_xml.get("completedAt", None))
result['progress'] = flowrun_xml.get("progress", None)
result['background_job_id'] = flowrun_xml.get("backgroundJobId", None)

return result
19 changes: 19 additions & 0 deletions tableauserverclient/models/job_item.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import xml.etree.ElementTree as ET
from .flow_run_item import FlowRunItem
from ..datetime_helpers import parse_datetime


Expand All @@ -24,6 +25,7 @@ def __init__(
finish_code=0,
notes=None,
mode=None,
flow_run=None,
):
self._id = id_
self._type = job_type
Expand All @@ -34,6 +36,7 @@ def __init__(
self._finish_code = finish_code
self._notes = notes or []
self._mode = mode
self._flow_run = flow_run

@property
def id(self):
Expand Down Expand Up @@ -76,6 +79,14 @@ def mode(self, value):
# check for valid data here
self._mode = value

@property
def flow_run(self):
return self._flow_run

@flow_run.setter
def flow_run(self, value):
self._flow_run = value

def __repr__(self):
return (
"<Job#{_id} {_type} created_at({_created_at}) started_at({_started_at}) completed_at({_completed_at})"
Expand All @@ -102,6 +113,13 @@ def _parse_element(cls, element, ns):
finish_code = int(element.get("finishCode", -1))
notes = [note.text for note in element.findall(".//t:notes", namespaces=ns)] or None
mode = element.get("mode", None)
flow_run = None
for flow_job in element.findall(".//t:runFlowJobType", namespaces=ns):
flow_run = FlowRunItem()
flow_run._id = flow_job.get("flowRunId", None)
for flow in flow_job.findall(".//t:flow", namespaces=ns):
flow_run._flow_id = flow.get("id", None)
flow_run._started_at = created_at or started_at
return cls(
id_,
type_,
Expand All @@ -112,6 +130,7 @@ def _parse_element(cls, element, ns):
finish_code,
notes,
mode,
flow_run,
)


Expand Down
1 change: 1 addition & 0 deletions tableauserverclient/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ColumnItem,
FlowItem,
WebhookItem,
FlowRunItem
)
from .endpoint import (
Auth,
Expand Down
1 change: 1 addition & 0 deletions tableauserverclient/server/endpoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .favorites_endpoint import Favorites
from .fileuploads_endpoint import Fileuploads
from .flows_endpoint import Flows
from .flow_runs_endpoint import FlowRuns
from .exceptions import (
ServerResponseError,
MissingRequiredFieldError,
Expand Down
13 changes: 12 additions & 1 deletion tableauserverclient/server/endpoint/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,16 @@ def __str__(self):
return f"Job {self.job.id} failed with notes {self.notes}"


class JobCanceledException(JobFailedException):
class JobCancelledException(JobFailedException):
pass
class FlowRunFailedException(Exception):
def __init__(self, flow_run):
self.background_job_id = flow_run.background_job_id
self.flow_run = flow_run

def __str__(self):
return f"FlowRun {self.flow_run.id} failed with job id {self.background_job_id}"


class FlowRunCancelledException(FlowRunFailedException):
pass
76 changes: 76 additions & 0 deletions tableauserverclient/server/endpoint/flow_runs_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from .endpoint import Endpoint, QuerysetEndpoint, api
from .exceptions import FlowRunFailedException, FlowRunCancelledException
from .. import FlowRunItem, PaginationItem
from ...exponential_backoff import ExponentialBackoffTimer

import logging

logger = logging.getLogger("tableau.endpoint.flowruns")


class FlowRuns(QuerysetEndpoint):
def __init__(self, parent_srv):
super(FlowRuns, self).__init__(parent_srv)

@property
def baseurl(self):
return "{0}/sites/{1}/flows/runs".format(self.parent_srv.baseurl, self.parent_srv.site_id)

# Get all flows
@api(version="3.10")
def get(self, req_options=None):
logger.info("Querying all flow runs on site")
url = self.baseurl
server_response = self.get_request(url, req_options)
pagination_item = PaginationItem.from_response(server_response.content, self.parent_srv.namespace)
all_flow_run_items = FlowRunItem.from_response(server_response.content, self.parent_srv.namespace)
return all_flow_run_items, pagination_item

# Get 1 flow by id
@api(version="3.10")
def get_by_id(self, flow_run_id):
if not flow_run_id:
error = "Flow ID undefined."
raise ValueError(error)
logger.info("Querying single flow (ID: {0})".format(flow_run_id))
url = "{0}/{1}".format(self.baseurl, flow_run_id)
server_response = self.get_request(url)
return FlowRunItem.from_response(server_response.content, self.parent_srv.namespace)[0]


# Cancel 1 flow run by id
@api(version="3.10")
def cancel(self, flow_run_id):
if not flow_run_id:
error = "Flow ID undefined."
raise ValueError(error)
id_ = getattr(flow_run_id, 'id', flow_run_id)
url = "{0}/{1}".format(self.baseurl, id_)
self.put_request(url)
logger.info("Deleted single flow (ID: {0})".format(id_))


@api(version="3.10")
def wait_for_job(self, flow_run_id, *, timeout=None):
if isinstance(flow_run_id, FlowRunItem):
flow_run_id = flow_run_id.id
assert isinstance(flow_run_id, str)
logger.debug(f"Waiting for flow run {flow_run_id}")

backoffTimer = ExponentialBackoffTimer(timeout=timeout)
flow_run = self.get_by_id(flow_run_id)
while flow_run.completed_at is None:
backoffTimer.sleep()
flow_run = self.get_by_id(flow_run_id)
logger.debug(f"\tFlowRun {flow_run_id} progress={flow_run.progress}")

logger.info("FlowRun {} Completed: Status: {}".format(flow_run_id, flow_run.status))

if flow_run.status == "Success":
return flow_run
elif flow_run.status == "Failed":
raise FlowRunFailedException(flow_run)
elif flow_run.status == "Cancelled":
raise FlowRunCancelledException(flow_run)
else:
raise AssertionError("Unexpected status in flow_run", flow_run)
5 changes: 3 additions & 2 deletions tableauserverclient/server/endpoint/jobs_endpoint.py
87FD
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .endpoint import Endpoint, api
from .exceptions import JobCanceledException, JobFailedException
from .exceptions import JobCancelledException, JobFailedException
from .. import JobItem, BackgroundJobItem, PaginationItem
from ..request_options import RequestOptionsBase
from ...exponential_backoff import ExponentialBackoffTimer
Expand Down Expand Up @@ -50,6 +50,7 @@ def get_by_id(self, job_id):
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

@api(version="2.6")
def wait_for_job(self, job_id, *, timeout=None):
if isinstance(job_id, JobItem):
job_id = job_id.id
Expand All @@ -70,6 +71,6 @@ def wait_for_job(self, job_id, *, timeout=None):
elif job.finish_code == JobItem.FinishCode.Failed:
raise JobFailedException(job)
elif job.finish_code == JobItem.FinishCode.Cancelled:
raise JobCanceledException(job)
raise JobCancelledException(job)
else:
raise AssertionError("Unexpected finish_code in job", job)
2 changes: 2 additions & 0 deletions tableauserverclient/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Favorites,
DataAlerts,
Fileuploads,
FlowRuns
)
from .endpoint.exceptions import (
EndpointUnavailableError,
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(self, server_address, use_server_version=False):
self.data_alerts = DataAlerts(self)
self.fileuploads = Fileuploads(self)
self._namespace = Namespace()
self.flow_runs = FlowRuns(self)

if use_server_version:
self.use_server_version()
Expand Down
5 changes: 4 additions & 1 deletion test/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def sleep_mock(interval):
def get_time():
return mock_time

patch = unittest.mock.patch
try:
patch = unittest.mock.patch
except AttributeError:
from unittest.mock import patch
with patch("time.sleep", sleep_mock), patch("time.time", get_time):
yield get_time
11 changes: 11 additions & 0 deletions test/assets/flow_refresh.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?xml version='1.0' encoding='UTF-8'?>
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.5.xsd">
<job id="d1b2ccd0-6dfa-444a-aee4-723dbd6b7c9d"
mode="Asynchronous"
type="RunFlow"
createdAt="2018-05-22T13:00:29Z">
<runFlowJobType flowRunId="e0c3067f-2333-4eee-8028-e0a56ca496f6">
<flow id="92967d2d-c7e2-46d0-8847-4802df58f484" name="FlowOne"/>
</runFlowJobType>
</job>
</tsResponse>
19 changes: 19 additions & 0 deletions test/assets/flow_runs_get.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.10.xsd">
<pagination pageNumber="1" pageSize="100" totalAvailable="2"/>
<flowRuns>
<flowRuns id="cc2e652d-4a9b-4476-8c93-b238c45db968"
flowId="587daa37-b84d-4400-a9a2-aa90e0be7837"
status="Success"
startedAt="2021-02-11T01:42:55Z"
completedAt="2021-02-11T01:57:38Z"
progress="100"
backgroundJobId="aa23f4ac-906f-11e9-86fb-3f0f71412e77"/>
<flowRuns id="a3104526-c0c6-4ea5-8362-e03fc7cbd7ee"
flowId="5c36be69-eb30-461b-b66e-3e2a8e27cc35"
status="Failed"
startedAt="2021-02-13T04:05:30Z"
completedAt="2021-02-13T04:05:35Z"
progress="100"
backgroundJobId="1ad21a9d-2530-4fbf-9064-efd3c736e023"/>
</flowRuns>
</tsResponse>
10 changes: 10 additions & 0 deletions test/assets/flow_runs_get_by_id.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.10.xsd">
<flowRun id="cc2e652d-4a9b-4476-8c93-b238c45db968"
flowId="587daa37-b84d-4400-a9a2-aa90e0be7837"
status="Success"
startedAt="2021-02-11T01:42:55Z"
completedAt="2021-02-11T01:57:38Z"
progress="100"
backgroundJobId="1ad21a9d-2530-4fbf-9064-efd3c736e023">
</flowRun>
</tsResponse>
Loading
0