8000 feat: incremental refresh for extracts by jacalata · Pull Request #1545 · tableau/server-client-python · GitHub
[go: up one dir, main page]

Skip to content

feat: incremental refresh for extracts #1545

New issue
Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions samples/create_extract_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def main():
help="desired logging level (set to error by default)",
)
# Options specific to this sample:
# This sample has no additional options, yet. If you add some, please add them here
parser.add_argument("resource_type", choices=["workbook", "datasource"])
parser.add_argument("resource_id")
parser.add_argument("--incremental", default=False)

args = parser.parse_args()

Expand All @@ -45,6 +47,7 @@ def main():
# Monthly Schedule
# This schedule will run on the 15th of every month at 11:30PM
monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15)
print(monthly_interval)
monthly_schedule = TSC.ScheduleItem(
None,
None,
Expand All @@ -53,18 +56,20 @@ def main():
monthly_interval,
)

# Default to using first workbook found in server
all_workbook_items, pagination_item = server.workbooks.get()
my_workbook: TSC.WorkbookItem = all_workbook_items[0]
my_workbook: TSC.WorkbookItem = server.workbooks.get_by_id(args.resource_id)

target_item = TSC.Target(
my_workbook.id, # the id of the workbook or datasource
"workbook", # alternatively can be "datasource"
)

extract_item = TSC.TaskItem(
refresh_type = "FullRefresh"
if args.incremental:
refresh_type = "Incremental"

scheduled_extract_item = TSC.TaskItem(
None,
"FullRefresh",
refresh_type,
None,
None,
None,
Expand All @@ -74,7 +79,7 @@ def main():
)

try:
response = server.tasks.create(extract_item)
response = server.tasks.create(scheduled_extract_item)
print(response)
except Exception as e:
print(e)
Expand Down
46 changes: 37 additions & 9 deletions samples/extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ def main():
help="desired logging level (set to error by default)",
)
# Options specific to this sample
parser.add_argument("--delete")
parser.add_argument("--create")
parser.add_argument("--create", action="store_true")
parser.add_argument("--delete", action="store_true")
parser.add_argument("--refresh", action="store_true")
parser.add_argument("--workbook", required=False)
parser.add_argument("--datasource", required=False)
args = parser.parse_args()

# Set logging level based on user input, or error by default
Expand All @@ -39,20 +42,45 @@ def main():
server.add_http_options({"verify": False})
server.use_server_version()
with server.auth.sign_in(tableau_auth):
# Gets all workbook items
all_workbooks, pagination_item = server.workbooks.get()
print(f"\nThere are {pagination_item.total_available} workbooks on site: ")
print([workbook.name for workbook in all_workbooks])

if all_workbooks:
# Pick one workbook from the list
wb = all_workbooks[3]
wb = None
ds = None
if args.workbook:
wb = server.workbooks.get_by_id(args.workbook)
if wb is None:
raise ValueError(f"Workbook not found for id {args.workbook}")
elif args.datasource:
ds = server.datasources.get_by_id(args.datasource)
if ds is None:
raise ValueError(f"Datasource not found for id {args.datasource}")
else:
# Gets all workbook items
all_workbooks, pagination_item = server.workbooks.get()
print(f"\nThere are {pagination_item.total_available} workbooks on site: ")
print([workbook.name for workbook in all_workbooks])

if all_workbooks:
# Pick one workbook from the list
wb = all_workbooks[3]

if args.create:
print("create extract on wb ", wb.name)
extract_job = server.workbooks.create_extract(wb, includeAll=True)
print(extract_job)

if args.refresh:
extract_job = None
if ds is not None:
print(f"refresh extract on datasource {ds.name}")
extract_job = server.datasources.refresh(ds, includeAll=True, incremental=True)
elif wb is not None:
print(f"refresh extract on workbook {wb.name}")
extract_job = server.workbooks.refresh(wb)
else:
print("no content item selected to refresh")

print(extract_job)

if args.delete:
print("delete extract on wb ", wb.name)
jj = server.workbooks.delete_extract(wb)
Expand Down
2 changes: 1 addition & 1 deletion samples/publish_workbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def main():

# Step 1: Sign in to server.
tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site)
server = TSC.Server(args.server, use_server_version=True)
server = TSC.Server(args.server, use_server_version=True, http_options={"verify": False})
with server.auth.sign_in(tableau_auth):
# Step2: Retrieve the project id, if a project name was passed
if args.project is not None:
Expand Down
33 changes: 25 additions & 8 deletions samples/refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,51 @@ def main():
# Options specific to this sample
parser.add_argument("resource_type", choices=["workbook", "datasource"])
parser.add_argument("resource_id")
parser.add_argument("--incremental")
parser.add_argument("--synchronous")

args = parser.parse_args()

# Set logging level based on user input, or error by default
logging_level = getattr(logging, args.logging_level.upper())
logging.basicConfig(level=logging_level)

refresh_type = "FullRefresh"
incremental = False
if args.incremental:
refresh_type = "Incremental"
incremental = True

tableau_auth = TSC.PersonalAccessTokenAuth(args.token_name, args.token_value, site_id=args.site)
server = TSC.Server(args.server, use_server_version=True)
server = TSC.Server(args.server, use_server_version=True, http_options={"verify": False})
with server.auth.sign_in(tableau_auth):
if args.resource_type == "workbook":
# Get the workbook by its Id to make sure it exists
resource = server.workbooks.get_by_id(args.resource_id)
print(resource)

# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
job = server.workbooks.refresh(args.resource_id)
job = server.workbooks.refresh(args.resource_id, incremental=incremental)
else:
# Get the datasource by its Id to make sure it exists
resource = server.datasources.get_by_id(args.resource_id)
print(resource)

# server.datasources.create_extract(resource)

# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
job = server.datasources.refresh(resource)
job = server.datasources.refresh(resource, incremental=incremental) # by default runs as a sync task,

print(f"Update job posted (ID: {job.id})")
print("Waiting for job...")
# `wait_for_job` will throw if the job isn't executed successfully
job = server.jobs.wait_for_job(job)
print("Job finished succesfully")
print(f"{refresh_type} job posted (ID: {job.id})")
if args.synchronous:
# equivalent to tabcmd --synchnronous: wait for the job to complete
try:
# `wait_for_job` will throw if the job isn't executed successfully
print("Waiting for job...")
server.jobs.wait_for_job(job)
print("Job finished succesfully")
except Exception as e:
print(f"Job failed! {e}")


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions tableauserverclient/server/endpoint/datasources_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ def update_connection(
return connection

@api(version="2.8")
def refresh(self, datasource_item: DatasourceItem) -> JobItem:
def refresh(self, datasource_item: DatasourceItem, incremental: bool = False) -> JobItem:
id_ = getattr(datasource_item, "id", datasource_item)
url = f"{self.baseurl}/{id_}/refresh"
empty_req = RequestFactory.Empty.empty_req()
server_response = self.post_request(url, empty_req)
refresh_req = RequestFactory.Task.refresh_req(incremental)
server_response = self.post_request(url, refresh_req)
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

Expand Down
8 changes: 5 additions & 3 deletions tableauserverclient/server/endpoint/workbooks_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,16 @@ def get_by_id(self, workbook_id: str) -> WorkbookItem:
return WorkbookItem.from_response(server_response.content, self.parent_srv.namespace)[0]

@api(version="2.8")
def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem:
def refresh(self, workbook_item: Union[WorkbookItem, str], incremental: bool = False) -> JobItem:
"""
Refreshes the extract of an existing workbook.

Parameters
----------
workbook_item : WorkbookItem | str
The workbook item or workbook ID.
incremental: bool
Whether to do a full refresh or incremental refresh of the extract data

Returns
-------
Expand All @@ -134,8 +136,8 @@ def refresh(self, workbook_item: Union[WorkbookItem, str]) -> JobItem:
"""
id_ = getattr(workbook_item, "id", workbook_item)
url = f"{self.baseurl}/{id_}/refresh"
empty_req = RequestFactory.Empty.empty_req()
server_response = self.post_request(url, empty_req)
refresh_req = RequestFactory.Task.refresh_req(incremental)
server_response = self.post_request(url, refresh_req)
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

Expand Down
7 changes: 7 additions & 0 deletions tableauserverclient/server/request_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,13 @@ def run_req(self, xml_request: ET.Element, task_item: Any) -> None:
# Send an empty tsRequest
pass

@_tsrequest_wrapped
def refresh_req(self, xml_request: ET.Element, incremental: bool = False) -> bytes:
task_element = ET.SubElement(xml_request, "extractRefresh")
if incremental:
task_element.attrib["incremental"] = "true"
return ET.tostring(xml_request)

@_tsrequest_wrapped
def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") -> bytes:
extract_element = ET.SubElement(xml_request, "extractRefresh")
Expand Down
Loading
0