8000 Add FlowRun Item and Endpoints. (#884) · ramdesh/server-client-python@428eb55 · GitHub
[go: up one dir, main page]

Skip to content < 8000 react-partial partial-name="keyboard-shortcuts-dialog" data-ssr="false" data-attempted-ssr="false" data-react-profiling="false" >

Commit 428eb55

Browse files
jorwoodsJordan Woods
andauthored
Add FlowRun Item and Endpoints. (tableau#884)
* Add tests for fetching flow runs * Implement basics of FlowRuns * Add tests for cancel flow run * Make FlowRuns a Queryset endpoint for easier filtering * Add test for flow refresh endpoint * Align to naming conventions * Apply name change consistently * Change flowrun_id into flow_run_id * Add wait_for_job to FlowRun * Tag wait_for_job with version number * Rewrite flow_run to use ExponentialBackoffTimer * Test flow run wait with backoff * Remove 3.5 from test matrix * Standardize spelling of cancelled Co-authored-by: Jordan Woods <Jordan.Woods@mkcorp.com>
1 parent a8b3424 commit 428eb55

File tree

18 files changed

+410
-4
lines changed
  • tableauserverclient
    • models
      • < 8000 div class="PRIVATE_TreeView-item-level-line prc-TreeView-TreeViewItemLevelLine-KPSSL">
  • server
  • test
  • 18 files changed

    +410
    -4
    lines changed

    tableauserverclient/__init__.py

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -34,6 +34,7 @@
    3434
    FlowItem,
    3535
    WebhookItem,
    3636
    PersonalAccessTokenAuth,
    37+
    FlowRunItem
    3738
    )
    3839
    from .server import (
    3940
    RequestOptions,

    tableauserverclient/models/__init__.py

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -10,6 +10,7 @@
    1010
    from .favorites_item import FavoriteItem
    1111
    from .group_item import GroupItem
    1212
    from .flow_item import FlowItem
    13+
    from .flow_run_item import FlowRunItem
    1314
    from .interval_item import (
    1415
    IntervalItem,
    1516
    DailyInterval,
    Lines changed: 106 additions & 0 deletions
    9E88
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,106 @@
    1+
    import xml.etree.ElementTree as ET
    2+
    from ..datetime_helpers import parse_datetime
    3+
    import itertools
    4+
    5+
    6+
    class FlowRunItem(object):
    7+
    def __init__(self) -> None:
    8+
    self._id=None
    9+
    self._flow_id=None
    10+
    self._status=None
    11+
    self._started_at=None
    12+
    self._completed_at=None
    13+
    self._progress=None
    14+
    self._background_job_id=None
    15+
    16+
    17+
    @property
    18+
    def id(self):
    19+
    return self._id
    20+
    21+
    22+
    @property
    23+
    def flow_id(self):
    24+
    return self._flow_id
    25+
    26+
    27+
    @property
    28+
    def status(self):
    29+
    return self._status
    30+
    31+
    32+
    @property
    33+
    def started_at(self):
    34+
    return self._started_at
    35+
    36+
    37+
    @property
    38+
    def completed_at(self):
    39+
    return self._completed_at
    40+
    41+
    42+
    @property
    43+
    def progress(self):
    44+
    return self._progress
    45+
    46+
    47+
    @property
    48+
    def background_job_id(self):
    49+
    return self._background_job_id
    50+
    51+
    52+
    def _set_values(
    53+
    self,
    54+
    id,
    55+
    flow_id,
    56+
    status,
    57+
    started_at,
    58+
    completed_at,
    59+
    progress,
    60+
    background_job_id,
    61+
    ):
    62+
    if id is not None:
    63+
    self._id = id
    64+
    if flow_id is not None:
    65+
    self._flow_id = flow_id
    66+
    if status is not None:
    67+
    self._status = status
    68+
    if started_at is not None:
    69+
    self._started_at = started_at
    70+
    if completed_at is not None:
    71+
    self._completed_at = completed_at
    72+
    if progress is not None:
    73+
    self._progress = progress
    74+
    if background_job_id is not None:
    75+
    self._background_job_id = background_job_id
    76+
    77+
    78+
    @classmethod
    79+
    def from_response(cls, resp, ns):
    80+
    all_flowrun_items = list()
    81+
    parsed_response = ET.fromstring(resp)
    82+
    all_flowrun_xml = itertools.chain(
    83+
    parsed_response.findall(".//t:flowRun[@id]", namespaces=ns),
    84+
    parsed_response.findall(".//t:flowRuns[@id]", namespaces=ns)
    85+
    )
    86+
    87+
    for flowrun_xml in all_flowrun_xml:
    88+
    parsed = cls._parse_element(flowrun_xml, ns)
    89+
    flowrun_item = cls()
    90+
    flowrun_item._set_values(**parsed)
    91+
    all_flowrun_items.append(flowrun_item)
    92+
    return all_flowrun_items
    93+
    94+
    95+
    @staticmethod
    96+
    def _parse_element(flowrun_xml, ns):
    97+
    result = {}
    98+
    result['id'] = flowrun_xml.get("id", None)
    99+
    result['flow_id'] = flowrun_xml.get("flowId", None)
    100+
    result['status'] = flowrun_xml.get("status", None)
    101+
    result['started_at'] = parse_datetime(flowrun_xml.get("startedAt", None))
    102+
    result['completed_at'] = parse_datetime(flowrun_xml.get("completedAt", None))
    103+
    result['progress'] = flowrun_xml.get("progress", None)
    104+
    result['background_job_id'] = flowrun_xml.get("backgroundJobId", None)
    105+
    106+
    return result

    tableauserverclient/models/job_item.py

    Lines changed: 19 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -1,4 +1,5 @@
    11
    import xml.etree.ElementTree as ET
    2+
    from .flow_run_item import FlowRunItem
    23
    from ..datetime_helpers import parse_datetime
    34

    45

    @@ -24,6 +25,7 @@ def __init__(
    2425
    finish_code=0,
    2526
    notes=None,
    2627
    mode=None,
    28+
    flow_run=None,
    2729
    ):
    2830
    self._id = id_
    2931
    self._type = job_type
    @@ -34,6 +36,7 @@ def __init__(
    3436
    self._finish_code = finish_code
    3537
    self._notes = notes or []
    3638
    self._mode = mode
    39+
    self._flow_run = flow_run
    3740

    3841
    @property
    3942
    def id(self):
    @@ -76,6 +79,14 @@ def mode(self, value):
    7679
    # check for valid data here
    7780
    self._mode = value
    7881

    82+
    @property
    83+
    def flow_run(self):
    84+
    return self._flow_run
    85+
    86+
    @flow_run.setter
    87+
    def flow_run(self, value):
    88+
    self._flow_run = value
    89+
    7990
    def __repr__(self):
    8091
    return (
    8192
    "<Job#{_id} {_type} created_at({_created_at}) started_at({_started_at}) completed_at({_completed_at})"
    @@ -102,6 +113,13 @@ def _parse_element(cls, element, ns):
    102113
    finish_code = int(element.get("finishCode", -1))
    103114
    notes = [note.text for note in element.findall(".//t:notes", namespaces=ns)] or None
    104115
    mode = element.get("mode", None)
    116+
    flow_run = None
    117+
    for flow_job in element.findall(".//t:runFlowJobType", namespaces=ns):
    118+
    flow_run = FlowRunItem()
    119+
    flow_run._id = flow_job.get("flowRunId", None)
    120+
    for flow in flow_job.findall(".//t:flow", namespaces=ns):
    121+
    flow_run._flow_id = flow.get("id", None)
    122+
    flow_run._started_at = created_at or started_at
    105123
    return cls(
    106124
    id_,
    107125
    type_,
    @@ -112,6 +130,7 @@ def _parse_element(cls, element, ns):
    112130
    finish_code,
    113131
    notes,
    114132
    mode,
    133+
    flow_run,
    115134
    )
    116135

    117136

    tableauserverclient/server/__init__.py

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -32,6 +32,7 @@
    3232
    ColumnItem,
    3333
    FlowItem,
    3434
    WebhookItem,
    35+
    FlowRunItem
    3536
    )
    3637
    from .endpoint import (
    3738
    Auth,

    tableauserverclient/server/endpoint/__init__.py

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -7,6 +7,7 @@
    77
    from .favorites_endpoint import Favorites
    88
    from .fileuploads_endpoint import Fileuploads
    99
    from .flows_endpoint import Flows
    10+
    from .flow_runs_endpoint import FlowRuns
    1011
    from .exceptions import (
    1112
    ServerResponseError,
    1213
    MissingRequiredFieldError,

    tableauserverclient/server/endpoint/exceptions.py

    Lines changed: 12 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -75,5 +75,16 @@ def __str__(self):
    7575
    return f"Job {self.job.id} failed with notes {self.notes}"
    7676

    7777

    78-
    class JobCanceledException(JobFailedException):
    78+
    class JobCancelledException(JobFailedException):
    7979
    pass
    80+
    class FlowRunFailedException(Exception):
    81+
    def __init__(self, flow_run):
    82+
    self.background_job_id = flow_run.background_job_id
    83+
    self.flow_run = flow_run
    84+
    85+
    def __str__(self):
    86+
    return f"FlowRun {self.flow_run.id} failed with job id {self.background_job_id}"
    87+
    88+
    89+
    class FlowRunCancelledException(FlowRunFailedException):
    90+
    pass
    Lines changed: 76 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,76 @@
    1+
    from .endpoint import Endpoint, QuerysetEndpoint, api
    2+
    from .exceptions import FlowRunFailedException, FlowRunCancelledException
    3+
    from .. import FlowRunItem, PaginationItem
    4+
    from ...exponential_backoff import ExponentialBackoffTimer
    5+
    6+
    import logging
    7+
    8+
    logger = logging.getLogger("tableau.endpoint.flowruns")
    9+
    10+
    11+
    class FlowRuns(QuerysetEndpoint):
    12+
    def __init__(self, parent_srv):
    13+
    super(FlowRuns, self).__init__(parent_srv)
    14+
    15+
    @property
    16+
    def baseurl(self):
    17+
    return "{0}/sites/{1}/flows/runs".format(self.parent_srv.baseurl, self.parent_srv.site_id)
    18+
    19+
    # Get all flows
    20+
    @api(version="3.10")
    21+
    def get(self, req_options=None):
    22+
    logger.info("Querying all flow runs on site")
    23+
    url = self.baseurl
    24+
    server_response = self.get_request(url, req_options)
    25+
    pagination_item = PaginationItem.from_response(server_response.content, self.parent_srv.namespace)
    26+
    all_flow_run_items = FlowRunItem.from_response(server_response.content, self.parent_srv.namespace)
    27+
    return all_flow_run_items, pagination_item
    28+
    29+
    # Get 1 flow by id
    30+
    @api(version="3.10")
    31+
    def get_by_id(self, flow_run_id):
    32+
    if not flow_run_id:
    33+
    error = "Flow ID undefined."
    34+
    raise ValueError(error)
    35+
    logger.info("Querying single flow (ID: {0})".format(flow_run_id))
    36+
    url = "{0}/{1}".format(self.baseurl, flow_run_id)
    37+
    server_response = self.get_request(url)
    38+
    return FlowRunItem.from_response(server_response.content, self.parent_srv.namespace)[0]
    39+
    40+
    41+
    # Cancel 1 flow run by id
    42+
    @api(version="3.10")
    43+
    def cancel(self, flow_run_id):
    44+
    if not flow_run_id:
    45+
    error = "Flow ID undefined."
    46+
    raise ValueError(error)
    47+
    id_ = getattr(flow_run_id, 'id', flow_run_id)
    48+
    url = "{0}/{1}".format(self.baseurl, id_)
    49+
    self.put_request(url)
    50+
    logger.info("Deleted single flow (ID: {0})".format(id_))
    51+
    52+
    53+
    @api(version="3.10")
    54+
    def wait_for_job(self, flow_run_id, *, timeout=None):
    55+
    if isinstance(flow_run_id, FlowRunItem):
    56+
    flow_run_id = flow_run_id.id
    57+
    assert isinstance(flow_run_id, str)
    58+
    logger.debug(f"Waiting for flow run {flow_run_id}")
    59+
    60+
    backoffTimer = ExponentialBackoffTimer(timeout=timeout)
    61+
    flow_run = self.get_by_id(flow_run_id)
    62+
    while flow_run.completed_at is None:
    63+
    backoffTimer.sleep()
    64+
    flow_run = self.get_by_id(flow_run_id)
    65+
    logger.debug(f"\tFlowRun {flow_run_id} progress={flow_run.progress}")
    66+
    67+
    logger.info("FlowRun {} Completed: Status: {}".format(flow_run_id, flow_run.status))
    68+
    69+
    if flow_run.status == "Success":
    70+
    return flow_run
    71+
    elif flow_run.status == "Failed":
    72+
    raise FlowRunFailedException(flow_run)
    73+
    elif flow_run.status == "Cancelled":
    74+
    raise FlowRunCancelledException(flow_run)
    75+
    else:
    76+
    raise AssertionError("Unexpected status in flow_run", flow_run)

    tableauserverclient/server/endpoint/jobs_endpoint.py

    Lines changed: 3 additions & 2 deletions
    Original file line numberDiff line numberDiff line change
    @@ -1,5 +1,5 @@
    11
    from .endpoint import Endpoint, api
    2-
    from .exceptions import JobCanceledException, JobFailedException
    2+
    from .exceptions import JobCancelledException, JobFailedException
    33
    from .. import JobItem, BackgroundJobItem, PaginationItem
    44
    from ..request_options import RequestOptionsBase
    55
    from ...exponential_backoff import ExponentialBackoffTimer
    @@ -44,6 +44,7 @@ def get_by_id(self, job_id):
    4444
    new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
    4545
    return new_job
    4646

    47+
    @api(version="2.6")
    4748
    def wait_for_job(self, job_id, *, timeout=None):
    4849
    if isinstance(job_id, JobItem):
    4950
    job_id = job_id.id
    @@ -64,6 +65,6 @@ def wait_for_job(self, job_id, *, timeout=None):
    6465
    elif job.finish_code == JobItem.FinishCode.Failed:
    6566
    raise JobFailedException(job)
    6667
    elif job.finish_code == JobItem.FinishCode.Cancelled:
    67-
    raise JobCanceledException(job)
    68+
    raise JobCancelledException(job)
    6869
    else:
    6970
    raise AssertionError("Unexpected finish_code in job", job)

    tableauserverclient/server/server.py

    Lines changed: 2 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -25,6 +25,7 @@
    2525
    Favorites,
    2626
    DataAlerts,
    2727
    Fileuploads,
    28+
    FlowRuns
    2829
    )
    2930
    from .endpoint.exceptions import (
    3031
    EndpointUnavailableError,
    @@ -85,6 +86,7 @@ def __init__(self, server_address, use_server_version=False):
    8586
    self.data_alerts = DataAlerts(self)
    8687
    self.fileuploads = Fileuploads(self)
    8788
    self._namespace = Namespace()
    89+
    self.flow_runs = FlowRuns(self)
    8890

    8991
    if use_server_version:
    9092
    self.use_server_version()

    test/_utils.py

    Lines changed: 4 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -29,6 +29,9 @@ def sleep_mock(interval):
    2929
    def get_time():
    3030
    return mock_time
    3131

    32-
    patch = unittest.mock.patch
    32+
    try:
    33+
    patch = unittest.mock.patch
    34+
    except AttributeError:
    35+
    from unittest.mock import patch
    3336
    with patch("time.sleep", sleep_mock), patch("time.time", get_time):
    3437
    yield get_time

    test/assets/flow_refresh.xml

    Lines changed: 11 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,11 @@
    1+
    <?xml version='1.0' encoding='UTF-8'?>
    2+
    <tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.5.xsd">
    3+
    <job id="d1b2ccd0-6dfa-444a-aee4-723dbd6b7c9d"
    4+
    mode="Asynchronous"
    5+
    type="RunFlow"
    6+
    createdAt="2018-05-22T13:00:29Z">
    7+
    <runFlowJobType flowRunId="e0c3067f-2333-4eee-8028-e0a56ca496f6">
    8+
    <flow id="92967d2d-c7e2-46d0-8847-4802df58f484" name="FlowOne"/>
    9+
    </runFlowJobType>
    10+
    </job>
    11+
    </tsResponse>

    test/assets/flow_runs_get.xml

    Lines changed: 19 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,19 @@
    1+
    <tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.10.xsd">
    2+
    <pagination pageNumber="1" pageSize="100" totalAvailable="2"/>
    3+
    <flowRuns>
    4+
    <flowRuns id="cc2e652d-4a9b-4476-8c93-b238c45db968"
    5+
    flowId="587daa37-b84d-4400-a9a2-aa90e0be7837"
    6+
    status="Success"
    7+
    startedAt="2021-02-11T01:42:55Z"
    8+
    completedAt="2021-02-11T01:57:38Z"
    9+
    progress="100"
    10+
    backgroundJobId="aa23f4ac-906f-11e9-86fb-3f0f71412e77"/>
    11+
    <flowRuns id="a3104526-c0c6-4ea5-8362-e03fc7cbd7ee"
    12+
    flowId="5c36be69-eb30-461b-b66e-3e2a8e27cc35"
    13+
    status="Failed"
    14+
    startedAt="2021-02-13T04:05:30Z"
    15+
    completedAt="2021-02-13T04:05:35Z"
    16+
    progress="100"
    17+
    backgroundJobId="1ad21a9d-2530-4fbf-9064-efd3c736e023"/>
    18+
    </flowRuns>
    19+
    </tsResponse>

    test/assets/flow_runs_get_by_id.xml

    Lines changed: 10 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,10 @@
    1+
    <tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.10.xsd">
    2+
    <flowRun id="cc2e652d-4a9b-4476-8c93-b238c45db968"
    3+
    flowId="587daa37-b84d-4400-a9a2-aa90e0be7837"
    4+
    status="Success"
    5+
    startedAt="2021-02-11T01:42:55Z"
    6+
    completedAt="2021-02-11T01:57:38Z"
    7+
    progress="100"
    8+
    backgroundJobId="1ad21a9d-2530-4fbf-9064-efd3c736e023">
    9+
    </flowRun>
    10+
    </tsResponse>

    0 commit comments

    Comments
     (0)
    0