From 0490b5b9dafde561283ab6ee9348ae4a6aaf61cd Mon Sep 17 00:00:00 2001 From: Nikola Jelic Date: Tue, 20 Jan 2026 16:53:57 +0100 Subject: [PATCH 1/6] Update error handling to accept status codes for auth errors --- cxreports_api_client/v1/client.py | 43 ++++++++++++++++++------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/cxreports_api_client/v1/client.py b/cxreports_api_client/v1/client.py index 80691f4..f916710 100644 --- a/cxreports_api_client/v1/client.py +++ b/cxreports_api_client/v1/client.py @@ -48,6 +48,24 @@ def wrapper(*args, **kwargs): raise RuntimeError(f"An error occurred: {req_err}") from req_err return wrapper + def __check_authentication(self, response): + """ + Check if the response indicates an authentication failure. + + Args: + response: The response object from requests. + + Raises: + RuntimeError: If the response indicates unauthenticated access. + """ + # Check status code first (401 = Unauthorized, 403 = Forbidden) + if response.status_code in (401, 403): + raise RuntimeError("Unauthenticated.") + + # Fallback: Check content type for HTML (backwards compatibility) + if response.headers.get('Content-Type', '').lower().startswith('text/html'): + raise RuntimeError("Unauthenticated.") + @__handle_requests_exceptions def get_pdf(self, reportId: int, params: dict = None, workspace_id:int = None): """ @@ -72,8 +90,7 @@ def get_pdf(self, reportId: int, params: dict = None, workspace_id:int = None): response = requests.get(url, headers=headers, verify=False) response.raise_for_status() - if response.headers.get('Content-Type', '').lower().startswith('text/html'): - raise RuntimeError("Unauthenticated.") + self.__check_authentication(response) if "application/pdf" not in response.headers.get("Content-Type", "").lower(): raise RuntimeError("Invalid content type, expected PDF") @@ -88,9 +105,7 @@ def get_report_types(self, workspace_id:int = None): response = requests.get(url, headers=headers, verify=False) response.raise_for_status() - # Check if the response content is HTML - if response.headers.get('Content-Type', '').lower().startswith('text/html'): - raise RuntimeError("Unauthenticated.") + self.__check_authentication(response) return response.json() @@ -110,9 +125,7 @@ def get_workspaces(self): response = requests.get(url, headers=headers, verify=False) response.raise_for_status() - # Check if the response content is HTML - if response.headers.get('Content-Type', '').lower().startswith('text/html'): - raise RuntimeError("Unauthenticated.") + self.__check_authentication(response) return response.json() @@ -135,9 +148,7 @@ def get_reports(self, type: str, workspace_id:int = None): response = requests.get(url, headers=headers, verify=False) response.raise_for_status() - # Check if the response content is HTML - if response.headers.get('Content-Type', '').lower().startswith('text/html'): - raise RuntimeError("Unauthenticated.") + self.__check_authentication(response) return response.json() @@ -157,9 +168,7 @@ def create_auth_token(self): response = requests.post(url, headers=headers, verify=False) response.raise_for_status() - # Check if the response content is HTML - if response.headers.get('Content-Type', '').lower().startswith('text/html'): - raise RuntimeError("Unauthenticated.") + self.__check_authentication(response) return response.json() @@ -186,10 +195,8 @@ def push_temporary_data(self, data:dict): response = requests.post(url, headers=headers, json=data, verify=False) response.raise_for_status() - - # If content type is HTML, then the user is not authenticated - if response.headers.get('Content-Type', '').lower().startswith('text/html'): - raise RuntimeError("Unauthenticated.") + + self.__check_authentication(response) return response.json() From 2a25c7cc41fd4acd1913c68d1c1e6526b2ca9cf1 Mon Sep 17 00:00:00 2001 From: Nikola Jelic Date: Wed, 21 Jan 2026 16:07:29 +0100 Subject: [PATCH 2/6] Add methods for report management and export handling in CxReportClientV1 --- cxreports_api_client/v1/client.py | 300 +++++++++++++++++++++++++++++- 1 file changed, 292 insertions(+), 8 deletions(-) diff --git a/cxreports_api_client/v1/client.py b/cxreports_api_client/v1/client.py index f916710..1f2d614 100644 --- a/cxreports_api_client/v1/client.py +++ b/cxreports_api_client/v1/client.py @@ -152,6 +152,245 @@ def get_reports(self, type: str, workspace_id:int = None): return response.json() + @__handle_requests_exceptions + def get_report_pages(self, report_id:int, workspace_id: int = None): + """ + Get the list of pages for a specific report. + + Args: + report_id (int): The ID of the report. + workspace_id (int, optional): The workspace ID. If not provided, uses the default workspace. + + Returns: + List[Dict[str, Any]]: A list of report pages with their metadata. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"reports/{report_id}/pages", workspace_id) + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() + + self.__check_authentication(response) + + return response.json() + + + @__handle_requests_exceptions + def post_pdf(self, reportId: int, request_body: dict = None, workspace_id: int = None): + """ + Export a report to PDF using POST method (allows passing data in request body). + + Args: + reportId (int): The ID of the report or report type code. + request_body (Optional[Dict[str, Any]]): Request body containing: + - params (dict): Report parameters + - data (dict): JSON data to be passed to the report + - lang (str): Preferred language of the report + - timezone (str): Preferred timezone + - format (str): Document format (default: "pdf") + - includeAttachments (bool): Whether to include attachments + workspace_id (int, optional): The workspace ID. + + Returns: + bytes: The PDF content. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + headers["Content-Type"] = "application/json" + + url = self.__get_url_with_workspace(f"reports/{reportId}/pdf", workspace_id) + + if request_body is None: + request_body = {} + + if "format" not in request_body: + request_body["format"] = "pdf" + + response = requests.post(url, headers=headers, json=request_body, verify=False) + response.raise_for_status() + + self.__check_authentication(response) + + if "application/pdf" not in response.headers.get("Content-Type", "").lower(): + raise RuntimeError("Invalid content type, expected PDF") + + return response.content + + @__handle_requests_exceptions + def start_report_export(self, reportId: int, request_body: dict = None, workspace_id: int = None): + """ + Start asynchronous report generation. + + Args: + reportId (int): The ID of the report or report type code. + request_body (Optional[Dict[str, Any]]): Request body containing: + - params (dict): Report parameters + - data (dict): JSON data to be passed to the report + - lang (str): Preferred language of the report + - timezone (str): Preferred timezone + - format (str): Document format (default: "pdf") + - includeAttachments (bool): Whether to zip the report and data attachments + - excludePages (list[int]): Array of page numbers to exclude from the report + - tempDataId (int): ID of the temporary data object + workspace_id (int, optional): The workspace ID. + + Returns: + Dict[str, Any]: Response containing temporaryFileId for polling status. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + headers["Content-Type"] = "application/json" + + url = self.__get_url_with_workspace(f"reports/{reportId}/export", workspace_id) + + if request_body is None: + request_body = {} + + if "format" not in request_body: + request_body["format"] = "pdf" + + response = requests.post(url, headers=headers, json=request_body, verify=False) + + if response.status_code != 202: + response.raise_for_status() + + self.__check_authentication(response) + + return response.json() + + @__handle_requests_exceptions + def get_jobs(self, workspace_id: int = None): + """ + Get the list of all jobs in the workspace. + + Args: + workspace_id (int, optional): The workspace ID. If not provided, uses the default workspace. + + Returns: + List[Dict[str, Any]]: A list of jobs with their configurations and metadata. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"jobs", workspace_id) + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() + self.__check_authentication(response) + return response.json() + + @__handle_requests_exceptions + def start_job_run(self, job_id: int, request_body: dict = None, workspace_id: int = None): + """ + Start a new job run for the specified job. + + Args: + job_id (int): The ID of the job to run. + request_body (Optional[Dict[str, Any]]): Request body containing: + - params (dict): Job parameters + - data (dict): JSON data to be passed to the job + workspace_id (int, optional): The workspace ID. If not provided, uses the default workspace. + + Returns: + Dict[str, Any]: Response containing jobRunId for tracking the run. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + headers["Content-Type"] = "application/json" + url = self.__get_url_with_workspace(f"jobs/{job_id}/runs", workspace_id) + if request_body is None: + request_body = {} + response = requests.post(url, headers=headers, json=request_body, verify=False) + response.raise_for_status() + self.__check_authentication(response) + return response.json() + + @__handle_requests_exceptions + def get_job_run_status(self, job_id: int, run_id: int, workspace_id: int = None): + """ + Get the status of a specific job run. + + Args: + job_id (int): The ID of the job. + run_id (int): The ID of the job run. + workspace_id (int, optional): The workspace ID. If not provided, uses the default workspace. + + Returns: + Dict[str, Any]: Job run status information including: + - finished (bool): Whether the job run has completed + - entries (int): Total number of entries + - status (dict): Breakdown of entry statuses (queued, review, completed, errors) + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"jobs/{job_id}/runs/{run_id}/status", workspace_id) + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() + self.__check_authentication(response) + return response.json() + + @__handle_requests_exceptions + def get_run_review_document(self, job_id: int, run_id: int, workspace_id: int = None): + """ + Generate a review document for entries that require review in a job run. + + This is typically used when a job has reviewRequired=true and there are entries + with status 'review' that need to be examined before delivery. + + Args: + job_id (int): The ID of the job. + run_id (int): The ID of the job run. + workspace_id (int, optional): The workspace ID. If not provided, uses the default workspace. + + Returns: + Dict[str, Any]: Response containing temporaryFileId for downloading the review document. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"jobs/{job_id}/runs/{run_id}/generate-review-document", workspace_id) + response = requests.post(url, headers=headers, verify=False) + response.raise_for_status() + self.__check_authentication(response) + return response.json() + + @__handle_requests_exceptions + def deliver_job_run_entries(self, job_id: int, run_id: int, workspace_id: int = None): + """ + Deliver (finalize) the entries from a completed job run. + + This should be called after the job run is finished and any required reviews + are completed. It triggers the final delivery of all job run entries. + + Args: + job_id (int): The ID of the job. + run_id (int): The ID of the job run. + workspace_id (int, optional): The workspace ID. If not provided, uses the default workspace. + + Returns: + Dict[str, Any]: Delivery confirmation response. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"jobs/{job_id}/runs/{run_id}/deliver", workspace_id) + response = requests.post(url, headers=headers, verify=False) + response.raise_for_status() + self.__check_authentication(response) + return response.json() + @__handle_requests_exceptions def create_auth_token(self): """ @@ -220,9 +459,55 @@ def get_preview_url(self, report_id:int, query_params:dict = None, workspace_id: query_params['nonce'] = nonce_token url = self.__append_query_params(url, query_params) return url - - - + + @__handle_requests_exceptions + def get_export_status(self, temp_file_id: int, workspace_id: int = None): + """ + Get the status of an asynchronous export. + + Args: + temp_file_id (int): The temporary file ID from the export request. + workspace_id (int, optional): The workspace ID. + + Returns: + Dict[str, Any]: The export status information. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"exports/{temp_file_id}/status", workspace_id) + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() + + self.__check_authentication(response) + + return response.json() + + @__handle_requests_exceptions + def get_export_content(self, temp_file_id: int, workspace_id: int = None): + """ + Download the content of a completed export. + + Args: + temp_file_id (int): The temporary file ID from the export request. + workspace_id (int, optional): The workspace ID. + + Returns: + bytes: The exported file content. + + Raises: + RuntimeError: If any request or processing error occurs. + """ + headers = self.__get_headers() + url = self.__get_url_with_workspace(f"exports/{temp_file_id}/content", workspace_id) + response = requests.get(url, headers=headers, verify=False) + response.raise_for_status() + + self.__check_authentication(response) + + return response.content + def __append_query_params(self, url: str, query_params: Dict[str, Any]) -> str: """ Append query parameters to the URL. @@ -234,7 +519,7 @@ def __append_query_params(self, url: str, query_params: Dict[str, Any]) -> str: Returns: str: The URL with query parameters attached. """ - + if query_params is None: return url params = {} @@ -246,16 +531,15 @@ def __append_query_params(self, url: str, query_params: Dict[str, Any]) -> str: json_params = json.dumps(query_params['params']) encoded_params = base64.urlsafe_b64encode(json_params.encode()).decode() params['params'] = encoded_params - + if 'nonce' in query_params and isinstance(query_params['nonce'], str): params['nonce'] = query_params['nonce'] - + if 'data' in query_params and isinstance(query_params['data'], str): params['data'] = query_params['data'] - + if 'timezone' in query_params and isinstance(query_params['timezone'], str): params['timezone'] = query_params['timezone'] return f"{url}?{urllib.parse.urlencode(params)}" - From 0cebc5fb7140499da8b3a8e0cf9e39c05b4d4b7f Mon Sep 17 00:00:00 2001 From: Nikola Jelic Date: Fri, 23 Jan 2026 14:59:57 +0100 Subject: [PATCH 3/6] Add support for additional query parameters in __append_query_params method --- cxreports_api_client/v1/client.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cxreports_api_client/v1/client.py b/cxreports_api_client/v1/client.py index 1f2d614..15d44df 100644 --- a/cxreports_api_client/v1/client.py +++ b/cxreports_api_client/v1/client.py @@ -541,5 +541,14 @@ def __append_query_params(self, url: str, query_params: Dict[str, Any]) -> str: if 'timezone' in query_params and isinstance(query_params['timezone'], str): params['timezone'] = query_params['timezone'] + if 'lang' in query_params and isinstance(query_params['lang'], str): + params['lang'] = query_params['lang'] + + if 'type' in query_params and isinstance(query_params['type'], str): + params['type'] = query_params['type'] + + if 'includeAttachments' in query_params and isinstance(query_params['includeAttachments'], bool): + params['includeAttachments'] = query_params['includeAttachments'] + return f"{url}?{urllib.parse.urlencode(params)}" From 273bc01e5b09e1d4067e57cdfc9090f7e8176d55 Mon Sep 17 00:00:00 2001 From: Nikola Jelic Date: Fri, 23 Jan 2026 15:04:04 +0100 Subject: [PATCH 4/6] Refactor naming and imports for consistency --- cxreports_api_client/v1/client.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cxreports_api_client/v1/client.py b/cxreports_api_client/v1/client.py index 15d44df..52d13b3 100644 --- a/cxreports_api_client/v1/client.py +++ b/cxreports_api_client/v1/client.py @@ -3,7 +3,7 @@ import urllib.parse import json from functools import wraps -from typing import Optional, Dict, Any, List +from typing import Dict, Any class CxReportClientV1: @@ -67,7 +67,7 @@ def __check_authentication(self, response): raise RuntimeError("Unauthenticated.") @__handle_requests_exceptions - def get_pdf(self, reportId: int, params: dict = None, workspace_id:int = None): + def get_pdf(self, report_id: int, params: dict = None, workspace_id:int = None): """ Fetch a PDF report. @@ -82,7 +82,7 @@ def get_pdf(self, reportId: int, params: dict = None, workspace_id:int = None): RuntimeError: If any request or processing error occurs. """ headers = self.__get_headers() - url = self.__get_url_with_workspace(f"reports/{reportId}/pdf", workspace_id) + url = self.__get_url_with_workspace(f"reports/{report_id}/pdf", workspace_id) url = self.__append_query_params(url, params) print(url) @@ -135,7 +135,7 @@ def get_reports(self, type: str, workspace_id:int = None): Fetch the list of reports by type. Args: - report_type (str): The type of report to fetch. + type (str): The type of report to fetch. Returns: List[Dict[str, Any]]: A list of reports. @@ -178,12 +178,12 @@ def get_report_pages(self, report_id:int, workspace_id: int = None): @__handle_requests_exceptions - def post_pdf(self, reportId: int, request_body: dict = None, workspace_id: int = None): + def post_pdf(self, report_id: int, request_body: dict = None, workspace_id: int = None): """ Export a report to PDF using POST method (allows passing data in request body). Args: - reportId (int): The ID of the report or report type code. + report_id (int): The ID of the report or report type code. request_body (Optional[Dict[str, Any]]): Request body containing: - params (dict): Report parameters - data (dict): JSON data to be passed to the report @@ -202,7 +202,7 @@ def post_pdf(self, reportId: int, request_body: dict = None, workspace_id: int = headers = self.__get_headers() headers["Content-Type"] = "application/json" - url = self.__get_url_with_workspace(f"reports/{reportId}/pdf", workspace_id) + url = self.__get_url_with_workspace(f"reports/{report_id}/pdf", workspace_id) if request_body is None: request_body = {} @@ -221,12 +221,12 @@ def post_pdf(self, reportId: int, request_body: dict = None, workspace_id: int = return response.content @__handle_requests_exceptions - def start_report_export(self, reportId: int, request_body: dict = None, workspace_id: int = None): + def start_report_export(self, report_id: int, request_body: dict = None, workspace_id: int = None): """ Start asynchronous report generation. Args: - reportId (int): The ID of the report or report type code. + report_id (int): The ID of the report or report type code. request_body (Optional[Dict[str, Any]]): Request body containing: - params (dict): Report parameters - data (dict): JSON data to be passed to the report @@ -247,7 +247,7 @@ def start_report_export(self, reportId: int, request_body: dict = None, workspac headers = self.__get_headers() headers["Content-Type"] = "application/json" - url = self.__get_url_with_workspace(f"reports/{reportId}/export", workspace_id) + url = self.__get_url_with_workspace(f"reports/{report_id}/export", workspace_id) if request_body is None: request_body = {} From 7fb1188fe2e5da9a180808fe211f988d779a6041 Mon Sep 17 00:00:00 2001 From: Nikola Jelic Date: Fri, 23 Jan 2026 15:24:08 +0100 Subject: [PATCH 5/6] Fix URL initialization by stripping whitespace in CxReportClientV1 constructor --- cxreports_api_client/v1/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cxreports_api_client/v1/client.py b/cxreports_api_client/v1/client.py index 52d13b3..64347e5 100644 --- a/cxreports_api_client/v1/client.py +++ b/cxreports_api_client/v1/client.py @@ -18,7 +18,9 @@ class CxReportClientV1: token (str): The authentication token used for API requests. """ def __init__(self, base_url:str, default_workspace_id:int, token:str): - self.url = base_url + self.url = base_url.strip().strip("/") + self.workspace_id = default_workspace_id + self.token = token self.token = token self.workspace_id = default_workspace_id From 1bb71969de2acf593ac53d5b7a1d85614388d95e Mon Sep 17 00:00:00 2001 From: Nikola Jelic Date: Fri, 23 Jan 2026 15:27:29 +0100 Subject: [PATCH 6/6] Add example usage script for CxReportClientV1 with environment variable configuration --- examples/example_usage.py | 303 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) create mode 100644 examples/example_usage.py diff --git a/examples/example_usage.py b/examples/example_usage.py new file mode 100644 index 0000000..3ba0aab --- /dev/null +++ b/examples/example_usage.py @@ -0,0 +1,303 @@ +""" +Example usage of CxReportClientV1 + +Configure the .env file in the project root with your credentials: +- CX_REPORT_URL: Base URL of the CxReports server +- CX_REPORT_TOKEN: Authentication token +- CX_REPORT_WORKSPACE: Workspace ID +- CX_REPORT_ID: Report ID to generate PDF +""" + +import os +import time +import json +from dotenv import load_dotenv # pip install python-dotenv + +from cxreports_api_client.v1.client import CxReportClientV1 + +load_dotenv() + +# Configuration from .env +def get_env_var(name: str, cast_type=None, required=True): + value = os.getenv(name) + if required and not value: + raise ValueError(f"Missing required environment variable: {name}") + if cast_type and value is not None: + try: + value = cast_type(value) + except Exception: + raise ValueError(f"Environment variable {name} must be of type {cast_type.__name__}") + return value + +# Configuration from .env +URL = get_env_var("CX_REPORT_URL") +TOKEN = get_env_var("CX_REPORT_TOKEN") +WORKSPACE = get_env_var("CX_REPORT_WORKSPACE", cast_type=int) +REPORT_ID = get_env_var("CX_REPORT_ID", cast_type=int) + +def main(): + """Run example usage of CxReportClientV1""" + + # Initialize the client + print(f"Connecting to {URL} with workspace {WORKSPACE}...") + client = CxReportClientV1(URL, WORKSPACE, TOKEN) + + # ========== WORKSPACES ========== + print("\n=== WORKSPACES ===") + try: + workspaces = client.get_workspaces() + print(json.dumps(workspaces, indent=2)) + except Exception as e: + print(f"Error: {e}") + + # ========== REPORT TYPES ========== + print("\n=== REPORT TYPES ===") + try: + report_types = client.get_report_types() + print(json.dumps(report_types, indent=2)) + except Exception as e: + print(f"Error: {e}") + + # ========== REPORTS ========== + print("\n=== REPORTS ===") + try: + reports = client.get_reports(type="all") + print(json.dumps(reports, indent=2)) + except Exception as e: + print(f"Error: {e}") + + # ========== REPORT PAGES ========== + print(f"\n=== REPORT PAGES (Report ID: {REPORT_ID}) ===") + try: + pages = client.get_report_pages(REPORT_ID) + print(json.dumps(pages, indent=2)) + except Exception as e: + print(f"Error: {e}") + + # ========== TEMPORARY DATA ========== + print("\n=== PUSHING TEMPORARY DATA ===") + temp_data_id = None + try: + invoice_data = { + "invoice": { + "invoiceNumber": "12345", + "dateIssued": "2024-01-27", + "dueDate": "2024-02-10", + "issuer": { + "name": "Test Corporation", + "address": "123 Business Rd, Business City, BC 12345", + "phone": "123-456-7890", + "email": "contact@xyzcorporation.com" + }, + "recipient": { + "name": "TEST Enterprises", + "address": "456 Enterprise Blvd, Commerce City, CC 67890", + "phone": "987-654-3210", + "email": "info@abcenterprises.com" + }, + "items": [ + { + "description": "Product 1", + "quantity": 10, + "unitPrice": 29.99, + "total": 299.90 + }, + { + "description": "Product 2", + "quantity": 5, + "unitPrice": 49.99, + "total": 249.95 + } + ], + "subTotal": 549.85, + "taxRate": 0.07, + "taxAmount": 38.49, + "total": 588.34, + "notes": "Thank you for your business. Passing data works!" + } + } + temp_data = client.push_temporary_data(invoice_data) + temp_data_id = temp_data.get('tempDataId') + print(f"Temporary Data ID: {temp_data_id}") + except Exception as e: + print(f"Error: {e}") + + # ========== GET PDF (Simple) ========== + print(f"\n=== DOWNLOADING PDF (Simple) - Report ID: {REPORT_ID} ===") + try: + pdf = client.get_pdf(REPORT_ID) + with open("./report1.pdf", 'wb') as pdf_file: + pdf_file.write(pdf) + print(f"Saved to report1.pdf (size: {len(pdf)} bytes)") + except Exception as e: + print(f"Error: {e}") + + # ========== GET PDF (With Temp Data) ========== + if temp_data_id: + print(f"\n=== DOWNLOADING PDF (With Temp Data) ===") + try: + pdf = client.get_pdf(REPORT_ID, { + "tempDataId": temp_data_id, + "timezone": "UTC" + }) + with open("./report2.pdf", 'wb') as pdf_file: + pdf_file.write(pdf) + print(f"Saved to report2.pdf (size: {len(pdf)} bytes)") + except Exception as e: + print(f"Error: {e}") + + # ========== POST PDF ========== + print(f"\n=== DOWNLOADING PDF (POST with Data) ===") + try: + request_body = { + "data": invoice_data, + "timezone": "UTC", + "format": "pdf" + } + pdf = client.post_pdf(REPORT_ID, request_body) + with open("./report3.pdf", 'wb') as pdf_file: + pdf_file.write(pdf) + print(f"Saved to report3.pdf (size: {len(pdf)} bytes)") + except Exception as e: + print(f"Error: {e}") + + # ========== PREVIEW URL ========== + print("\n=== PREVIEW URL ===") + try: + preview_url = client.get_preview_url(REPORT_ID, { + "tempDataId": temp_data_id + } if temp_data_id else None) + print(f"Preview URL: {preview_url}") + except Exception as e: + print(f"Error: {e}") + + # ========== ASYNC EXPORT ========== + print(f"\n=== ASYNC EXPORT ===") + try: + export_request = { + "data": invoice_data, + "format": "pdf", + "timezone": "UTC", + "includeAttachments": False + } + export_response = client.start_report_export(REPORT_ID, export_request) + temp_file_id = export_response['temporaryFileId'] + print(f"Export started. Temporary File ID: {temp_file_id}") + + # ========== POLLING EXPORT STATUS ========== + print("\n=== POLLING EXPORT STATUS ===") + max_attempts = 30 + attempt = 0 + export_ready = False + + while not export_ready and attempt < max_attempts: + time.sleep(2) # Wait 2 seconds between polls + + status = client.get_export_status(temp_file_id) + attempt += 1 + print(f"Attempt {attempt}: Status={status.get('status')}, IsReady={status.get('isReady')}") + + if status.get('isReady'): + export_ready = True + print(f"File ready! Name: {status.get('name')}, Size: {status.get('contentSize')} bytes") + elif status.get('status') == 'Failed': + print(f"Export failed: {status.get('errorMessage')}") + break + + # ========== DOWNLOADING EXPORTED FILE ========== + if export_ready: + print("\n=== DOWNLOADING EXPORTED FILE ===") + content = client.get_export_content(temp_file_id) + with open("./report_async.pdf", 'wb') as f: + f.write(content) + print(f"Saved to report_async.pdf (size: {len(content)} bytes)") + + except Exception as e: + print(f"Error: {e}") + + # ========== NONCE TOKEN ========== + print("\n=== NONCE TOKEN ===") + try: + nonce = client.create_auth_token() + print(f"Nonce Token: {nonce.get('nonce')}") + print("(This can be used for iframe authentication)") + except Exception as e: + print(f"Error: {e}") + + # ========== JOBS ========== + print("\n=== JOBS ===") + try: + jobs = client.get_jobs() + print(json.dumps(jobs, indent=2)) + + if jobs and len(jobs) > 0: + job = jobs[0] + job_id = job.get('id') + print(f"\n=== STARTING JOB RUN: {job.get('name')} (ID: {job_id}) ===") + + # ========== START JOB RUN ========== + try: + job_run_request = { + "params": {}, + "data": {} + } + job_run = client.start_job_run(job_id, job_run_request) + job_run_id = job_run.get('jobRunId') + print(f"Job run started. Run ID: {job_run_id}") + + # ========== POLLING JOB RUN STATUS ========== + print("\n=== POLLING JOB RUN STATUS ===") + job_finished = False + job_attempt = 0 + max_job_attempts = 30 + + while not job_finished and job_attempt < max_job_attempts: + time.sleep(3) # Wait 3 seconds between polls + + job_status = client.get_job_run_status(job_id, job_run_id) + job_attempt += 1 + + status_info = job_status.get('status', {}) + print(f"Attempt {job_attempt}: Finished={job_status.get('finished')}, " + f"Entries={job_status.get('entries')}, " + f"Queued={status_info.get('queued')}, " + f"Review={status_info.get('review')}, " + f"Completed={status_info.get('completed')}, " + f"Errors={status_info.get('errors')}") + + if job_status.get('finished'): + job_finished = True + + # ========== GENERATE REVIEW DOCUMENT ========== + if job.get('reviewRequired') and status_info.get('review', 0) > 0: + print("\n=== GENERATING REVIEW DOCUMENT ===") + try: + review_doc = client.get_run_review_document(job_id, job_run_id) + print(f"Review document temporary file ID: {review_doc.get('temporaryFileId')}") + + # ========== DELIVER JOB RUN ========== + print("\n=== DELIVERING JOB RUN (after review) ===") + deliver_response = client.deliver_job_run_entries(job_id, job_run_id) + print(f"Job delivered successfully") + except Exception as e: + print(f"Error in review/deliver: {e}") + else: + print("\nNo review required or no items in review status.") + + if not job_finished: + print(f"Job run did not finish within {max_job_attempts} attempts") + + except Exception as e: + print(f"Error running job: {e}") + else: + print("No jobs available to run") + + except Exception as e: + print(f"Error: {e}") + + print("\n=== DONE ===") + print("All examples completed!") + + +if __name__ == '__main__': + main() \ No newline at end of file