diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 8d1ca38e62..8dd9328fb8 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -99,6 +99,7 @@ def _create_udf(self): project=None, timeout=None, query_with_job=True, + publisher=self._session._publisher, ) return udf_name diff --git a/bigframes/core/events.py b/bigframes/core/events.py new file mode 100644 index 0000000000..d0e5f7ad69 --- /dev/null +++ b/bigframes/core/events.py @@ -0,0 +1,237 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +import datetime +import threading +from typing import Any, Callable, Optional, Set +import uuid + +import google.cloud.bigquery._job_helpers +import google.cloud.bigquery.job.query +import google.cloud.bigquery.table + +import bigframes.session.executor + + +class Subscriber: + def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): + self._publisher = publisher + self._callback = callback + self._subscriber_id = uuid.uuid4() + + def __call__(self, *args, **kwargs): + return self._callback(*args, **kwargs) + + def __hash__(self) -> int: + return hash(self._subscriber_id) + + def __eq__(self, value: object): + if not isinstance(value, Subscriber): + return NotImplemented + return value._subscriber_id == self._subscriber_id + + def close(self): + self._publisher.unsubscribe(self) + del self._publisher + del self._callback + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_value is not None: + self( + UnknownErrorEvent( + exc_type=exc_type, + exc_value=exc_value, + traceback=traceback, + ) + ) + self.close() + + +class Publisher: + def __init__(self): + self._subscribers_lock = threading.Lock() + self._subscribers: Set[Subscriber] = set() + + def subscribe(self, callback: Callable[[Event], None]) -> Subscriber: + # TODO(b/448176657): figure out how to handle subscribers/publishers in + # a background thread. Maybe subscribers should be thread-local? + subscriber = Subscriber(callback, publisher=self) + with self._subscribers_lock: + self._subscribers.add(subscriber) + return subscriber + + def unsubscribe(self, subscriber: Subscriber): + with self._subscribers_lock: + self._subscribers.remove(subscriber) + + def publish(self, event: Event): + with self._subscribers_lock: + for subscriber in self._subscribers: + subscriber(event) + + +class Event: + pass + + +@dataclasses.dataclass(frozen=True) +class SessionClosed(Event): + session_id: str + + +class ExecutionStarted(Event): + pass + + +class ExecutionRunning(Event): + pass + + +@dataclasses.dataclass(frozen=True) +class ExecutionFinished(Event): + result: Optional[bigframes.session.executor.ExecuteResult] = None + + +@dataclasses.dataclass(frozen=True) +class UnknownErrorEvent(Event): + exc_type: Any + exc_value: Any + traceback: Any + + +@dataclasses.dataclass(frozen=True) +class BigQuerySentEvent(ExecutionRunning): + """Query sent to BigQuery.""" + + query: str + billing_project: Optional[str] = None + location: Optional[str] = None + job_id: Optional[str] = None + request_id: Optional[str] = None + + @classmethod + def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent): + return cls( + query=event.query, + billing_project=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=event.request_id, + ) + + +@dataclasses.dataclass(frozen=True) +class BigQueryRetryEvent(ExecutionRunning): + """Query sent another time because the previous attempt failed.""" + + query: str + billing_project: Optional[str] = None + location: Optional[str] = None + job_id: Optional[str] = None + request_id: Optional[str] = None + + @classmethod + def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent): + return cls( + query=event.query, + billing_project=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=event.request_id, + ) + + +@dataclasses.dataclass(frozen=True) +class BigQueryReceivedEvent(ExecutionRunning): + """Query received and acknowledged by the BigQuery API.""" + + billing_project: Optional[str] = None + location: Optional[str] = None + job_id: Optional[str] = None + statement_type: Optional[str] = None + state: Optional[str] = None + query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None + created: Optional[datetime.datetime] = None + started: Optional[datetime.datetime] = None + ended: Optional[datetime.datetime] = None + + @classmethod + def from_bqclient( + cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent + ): + return cls( + billing_project=event.billing_project, + location=event.location, + job_id=event.job_id, + statement_type=event.statement_type, + state=event.state, + query_plan=event.query_plan, + created=event.created, + started=event.started, + ended=event.ended, + ) + + +@dataclasses.dataclass(frozen=True) +class BigQueryFinishedEvent(ExecutionRunning): + """Query finished successfully.""" + + billing_project: Optional[str] = None + location: Optional[str] = None + query_id: Optional[str] = None + job_id: Optional[str] = None + destination: Optional[google.cloud.bigquery.table.TableReference] = None + total_rows: Optional[int] = None + total_bytes_processed: Optional[int] = None + slot_millis: Optional[int] = None + created: Optional[datetime.datetime] = None + started: Optional[datetime.datetime] = None + ended: Optional[datetime.datetime] = None + + @classmethod + def from_bqclient( + cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent + ): + return cls( + billing_project=event.billing_project, + location=event.location, + query_id=event.query_id, + job_id=event.job_id, + destination=event.destination, + total_rows=event.total_rows, + total_bytes_processed=event.total_bytes_processed, + slot_millis=event.slot_millis, + created=event.created, + started=event.started, + ended=event.ended, + ) + + +@dataclasses.dataclass(frozen=True) +class BigQueryUnknownEvent(ExecutionRunning): + """Got unknown event from the BigQuery client library.""" + + # TODO: should we just skip sending unknown events? + + event: object + + @classmethod + def from_bqclient(cls, event): + return cls(event) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index de153fca48..ea04d48d33 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4671,24 +4671,24 @@ def to_string( ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_string( buf, - columns, # type: ignore - col_space, - header, # type: ignore - index, - na_rep, - formatters, - float_format, - sparsify, - index_names, - justify, - max_rows, - max_cols, - show_dimensions, - decimal, - line_width, - min_rows, - max_colwidth, - encoding, + columns=columns, # type: ignore + col_space=col_space, + header=header, # type: ignore + index=index, + na_rep=na_rep, + formatters=formatters, + float_format=float_format, + sparsify=sparsify, + index_names=index_names, + justify=justify, + max_rows=max_rows, + max_cols=max_cols, + show_dimensions=show_dimensions, + decimal=decimal, + line_width=line_width, + min_rows=min_rows, + max_colwidth=max_colwidth, + encoding=encoding, ) def to_html( @@ -4721,28 +4721,28 @@ def to_html( ) -> str: return self.to_pandas(allow_large_results=allow_large_results).to_html( buf, - columns, # type: ignore - col_space, - header, - index, - na_rep, - formatters, - float_format, - sparsify, - index_names, - justify, # type: ignore - max_rows, - max_cols, - show_dimensions, - decimal, - bold_rows, - classes, - escape, - notebook, - border, - table_id, - render_links, - encoding, + columns=columns, # type: ignore + col_space=col_space, + header=header, + index=index, + na_rep=na_rep, + formatters=formatters, + float_format=float_format, + sparsify=sparsify, + index_names=index_names, + justify=justify, # type: ignore + max_rows=max_rows, + max_cols=max_cols, + show_dimensions=show_dimensions, + decimal=decimal, + bold_rows=bold_rows, + classes=classes, + escape=escape, + notebook=notebook, + border=border, + table_id=table_id, + render_links=render_links, + encoding=encoding, ) def to_markdown( @@ -4754,7 +4754,7 @@ def to_markdown( allow_large_results: Optional[bool] = None, **kwargs, ) -> str | None: - return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode, index, **kwargs) # type: ignore + return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_pickle( diff --git a/bigframes/formatting_helpers.py b/bigframes/formatting_helpers.py index 48afb4fdbd..f75394c47d 100644 --- a/bigframes/formatting_helpers.py +++ b/bigframes/formatting_helpers.py @@ -13,11 +13,13 @@ # limitations under the License. """Shared helper functions for formatting jobs related info.""" -# TODO(orrbradford): cleanup up typings and documenttion in this file + +from __future__ import annotations import datetime +import html import random -from typing import Any, Optional, Type, Union +from typing import Any, Optional, Type, TYPE_CHECKING, Union import bigframes_vendored.constants as constants import google.api_core.exceptions as api_core_exceptions @@ -25,7 +27,9 @@ import humanize import IPython import IPython.display as display -import ipywidgets as widgets + +if TYPE_CHECKING: + import bigframes.core.events GenericJob = Union[ bigquery.LoadJob, bigquery.ExtractJob, bigquery.QueryJob, bigquery.CopyJob @@ -58,39 +62,6 @@ def create_exception_with_feedback_link( return exception(constants.FEEDBACK_LINK) -def repr_query_job_html(query_job: Optional[bigquery.QueryJob]): - """Return query job in html format. - Args: - query_job (bigquery.QueryJob, Optional): - The job representing the execution of the query on the server. - Returns: - Pywidget html table. - """ - if query_job is None: - return display.HTML("No job information available") - if query_job.dry_run: - return display.HTML( - f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}" - ) - table_html = "" - table_html += "" - for key, value in query_job_prop_pairs.items(): - job_val = getattr(query_job, value) - if job_val is not None: - if key == "Job Id": # add link to job - table_html += f"""""" - elif key == "Slot Time": - table_html += ( - f"""""" - ) - elif key == "Bytes Processed": - table_html += f"""""" - else: - table_html += f"""""" - table_html += "
{key}{job_val}
{key}{get_formatted_time(job_val)}
{key}{get_formatted_bytes(job_val)}
{key}{job_val}
" - return widgets.HTML(table_html) - - def repr_query_job(query_job: Optional[bigquery.QueryJob]): """Return query job as a formatted string. Args: @@ -109,7 +80,11 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]): if job_val is not None: res += "\n" if key == "Job Id": # add link to job - res += f"""Job url: {get_job_url(query_job)}""" + res += f"""Job url: {get_job_url( + project_id=query_job.project, + location=query_job.location, + job_id=query_job.job_id, + )}""" elif key == "Slot Time": res += f"""{key}: {get_formatted_time(job_val)}""" elif key == "Bytes Processed": @@ -119,71 +94,90 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]): return res -def wait_for_query_job( - query_job: bigquery.QueryJob, - max_results: Optional[int] = None, - page_size: Optional[int] = None, - progress_bar: Optional[str] = None, -) -> bigquery.table.RowIterator: - """Return query results. Displays a progress bar while the query is running - Args: - query_job (bigquery.QueryJob, Optional): - The job representing the execution of the query on the server. - max_results (int, Optional): - The maximum number of rows the row iterator should return. - page_size (int, Optional): - The number of results to return on each results page. - progress_bar (str, Optional): - Which progress bar to show. - Returns: - A row iterator over the query results. - """ +current_display: Optional[display.HTML] = None +current_display_id: Optional[str] = None +previous_display_html: str = "" + + +def progress_callback( + event: bigframes.core.events.Event, +): + """Displays a progress bar while the query is running""" + global current_display, current_display_id, previous_display_html + + import bigframes._config + import bigframes.core.events + + progress_bar = bigframes._config.options.display.progress_bar + if progress_bar == "auto": progress_bar = "notebook" if in_ipython() else "terminal" - try: - if progress_bar == "notebook": - display_id = str(random.random()) - loading_bar = display.HTML(get_query_job_loading_html(query_job)) - display.display(loading_bar, display_id=display_id) - query_result = query_job.result( - max_results=max_results, page_size=page_size + if progress_bar == "notebook": + if ( + isinstance(event, bigframes.core.events.ExecutionStarted) + or current_display is None + or current_display_id is None + ): + previous_display_html = "" + current_display_id = str(random.random()) + current_display = display.HTML("Starting.") + display.display( + current_display, + display_id=current_display_id, ) - query_job.reload() + + if isinstance(event, bigframes.core.events.BigQuerySentEvent): + previous_display_html = render_bqquery_sent_event_html(event) display.update_display( - display.HTML(get_query_job_loading_html(query_job)), - display_id=display_id, + display.HTML(previous_display_html), + display_id=current_display_id, ) - elif progress_bar == "terminal": - initial_loading_bar = get_query_job_loading_string(query_job) - print(initial_loading_bar) - query_result = query_job.result( - max_results=max_results, page_size=page_size + elif isinstance(event, bigframes.core.events.BigQueryRetryEvent): + previous_display_html = render_bqquery_retry_event_html(event) + display.update_display( + display.HTML(previous_display_html), + display_id=current_display_id, ) - query_job.reload() - if initial_loading_bar != get_query_job_loading_string(query_job): - print(get_query_job_loading_string(query_job)) - else: - # No progress bar. - query_result = query_job.result( - max_results=max_results, page_size=page_size + elif isinstance(event, bigframes.core.events.BigQueryReceivedEvent): + previous_display_html = render_bqquery_received_event_html(event) + display.update_display( + display.HTML(previous_display_html), + display_id=current_display_id, ) - query_job.reload() - return query_result - except api_core_exceptions.RetryError as exc: - add_feedback_link(exc) - raise - except api_core_exceptions.GoogleAPICallError as exc: - add_feedback_link(exc) - raise - except KeyboardInterrupt: - query_job.cancel() - print( - f"Requested cancellation for {query_job.job_type.capitalize()}" - f" job {query_job.job_id} in location {query_job.location}..." - ) - # begin the cancel request before immediately rethrowing - raise + elif isinstance(event, bigframes.core.events.BigQueryFinishedEvent): + previous_display_html = render_bqquery_finished_event_html(event) + display.update_display( + display.HTML(previous_display_html), + display_id=current_display_id, + ) + elif isinstance(event, bigframes.core.events.ExecutionFinished): + display.update_display( + display.HTML(f"✅ Completed. {previous_display_html}"), + display_id=current_display_id, + ) + elif isinstance(event, bigframes.core.events.SessionClosed): + display.update_display( + display.HTML(f"Session {event.session_id} closed."), + display_id=current_display_id, + ) + elif progress_bar == "terminal": + if isinstance(event, bigframes.core.events.ExecutionStarted): + print("Starting execution.") + elif isinstance(event, bigframes.core.events.BigQuerySentEvent): + message = render_bqquery_sent_event_plaintext(event) + print(message) + elif isinstance(event, bigframes.core.events.BigQueryRetryEvent): + message = render_bqquery_retry_event_plaintext(event) + print(message) + elif isinstance(event, bigframes.core.events.BigQueryReceivedEvent): + message = render_bqquery_received_event_plaintext(event) + print(message) + elif isinstance(event, bigframes.core.events.BigQueryFinishedEvent): + message = render_bqquery_finished_event_plaintext(event) + print(message) + elif isinstance(event, bigframes.core.events.ExecutionFinished): + print("Execution done.") def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None): @@ -234,24 +228,74 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None): raise -def get_job_url(query_job: GenericJob): +def render_query_references( + *, + project_id: Optional[str], + location: Optional[str], + job_id: Optional[str], + request_id: Optional[str], +) -> str: + query_id = "" + if request_id and not job_id: + query_id = f" with request ID {project_id}:{location}.{request_id}" + return query_id + + +def render_job_link_html( + *, + project_id: Optional[str], + location: Optional[str], + job_id: Optional[str], +) -> str: + job_url = get_job_url( + project_id=project_id, + location=location, + job_id=job_id, + ) + if job_url: + job_link = f' [Job {project_id}:{location}.{job_id} details]' + else: + job_link = "" + return job_link + + +def render_job_link_plaintext( + *, + project_id: Optional[str], + location: Optional[str], + job_id: Optional[str], +) -> str: + job_url = get_job_url( + project_id=project_id, + location=location, + job_id=job_id, + ) + if job_url: + job_link = f" Job {project_id}:{location}.{job_id} details: {job_url}" + else: + job_link = "" + return job_link + + +def get_job_url( + *, + project_id: Optional[str], + location: Optional[str], + job_id: Optional[str], +): """Return url to the query job in cloud console. - Args: - query_job (GenericJob): - The job representing the execution of the query on the server. + Returns: String url. """ - if ( - query_job.project is None - or query_job.location is None - or query_job.job_id is None - ): + if project_id is None or location is None or job_id is None: return None - return f"""https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{query_job.job_id}&page=queryresults""" + return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults""" -def get_query_job_loading_html(query_job: bigquery.QueryJob): +def render_bqquery_sent_event_html( + event: bigframes.core.events.BigQuerySentEvent, +) -> str: """Return progress bar html string Args: query_job (bigquery.QueryJob): @@ -259,18 +303,195 @@ def get_query_job_loading_html(query_job: bigquery.QueryJob): Returns: Html string. """ - return f"""Query job {query_job.job_id} is {query_job.state}. {get_bytes_processed_string(query_job.total_bytes_processed)}Open Job""" + job_link = render_job_link_html( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=event.request_id, + ) + query_text_details = f"
SQL
{html.escape(event.query)}
" + + return f""" + Query started{query_id}.{job_link}{query_text_details} + """ -def get_query_job_loading_string(query_job: bigquery.QueryJob): - """Return progress bar string + +def render_bqquery_sent_event_plaintext( + event: bigframes.core.events.BigQuerySentEvent, +) -> str: + """Return progress bar html string Args: query_job (bigquery.QueryJob): The job representing the execution of the query on the server. Returns: - String + Html string. + """ + + job_link = render_job_link_plaintext( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=event.request_id, + ) + + return f"Query started{query_id}.{job_link}" + + +def render_bqquery_retry_event_html( + event: bigframes.core.events.BigQueryRetryEvent, +) -> str: + """Return progress bar html string for retry event.""" + + job_link = render_job_link_html( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=event.request_id, + ) + query_text_details = f"
SQL
{html.escape(event.query)}
" + + return f""" + Retrying query{query_id}.{job_link}{query_text_details} + """ + + +def render_bqquery_retry_event_plaintext( + event: bigframes.core.events.BigQueryRetryEvent, +) -> str: + """Return progress bar plaintext string for retry event.""" + + job_link = render_job_link_plaintext( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=event.request_id, + ) + return f"Retrying query{query_id}.{job_link}" + + +def render_bqquery_received_event_html( + event: bigframes.core.events.BigQueryReceivedEvent, +) -> str: + """Return progress bar html string for received event.""" + + job_link = render_job_link_html( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=None, + ) + + query_plan_details = "" + if event.query_plan: + plan_str = "\n".join([str(entry) for entry in event.query_plan]) + query_plan_details = f"
Query Plan
{html.escape(plan_str)}
" + + return f""" + Query{query_id} is {event.state}.{job_link}{query_plan_details} + """ + + +def render_bqquery_received_event_plaintext( + event: bigframes.core.events.BigQueryReceivedEvent, +) -> str: + """Return progress bar plaintext string for received event.""" + + job_link = render_job_link_plaintext( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=None, + ) + return f"Query{query_id} is {event.state}.{job_link}" + + +def render_bqquery_finished_event_html( + event: bigframes.core.events.BigQueryFinishedEvent, +) -> str: + """Return progress bar html string for finished event.""" + + bytes_str = "" + if event.total_bytes_processed is not None: + bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)}" + + slot_time_str = "" + if event.slot_millis is not None: + slot_time = datetime.timedelta(milliseconds=event.slot_millis) + slot_time_str = f" in {humanize.naturaldelta(slot_time)} of slot time" + + job_link = render_job_link_html( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=None, + ) + return f""" + Query processed{bytes_str}{slot_time_str}{query_id}.{job_link} """ - return f"""Query job {query_job.job_id} is {query_job.state}.{get_bytes_processed_string(query_job.total_bytes_processed)} \n{get_job_url(query_job)}""" + + +def render_bqquery_finished_event_plaintext( + event: bigframes.core.events.BigQueryFinishedEvent, +) -> str: + """Return progress bar plaintext string for finished event.""" + + bytes_str = "" + if event.total_bytes_processed is not None: + bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed." + + slot_time_str = "" + if event.slot_millis is not None: + slot_time = datetime.timedelta(milliseconds=event.slot_millis) + slot_time_str = f" Slot time: {humanize.naturaldelta(slot_time)}." + + job_link = render_job_link_plaintext( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + ) + query_id = render_query_references( + project_id=event.billing_project, + location=event.location, + job_id=event.job_id, + request_id=None, + ) + return f"Query{query_id} finished.{bytes_str}{slot_time_str}{job_link}" def get_base_job_loading_html(job: GenericJob): @@ -281,7 +502,11 @@ def get_base_job_loading_html(job: GenericJob): Returns: Html string. """ - return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. Open Job""" + return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. Open Job""" def get_base_job_loading_string(job: GenericJob): @@ -292,7 +517,11 @@ def get_base_job_loading_string(job: GenericJob): Returns: String """ - return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. \n{get_job_url(job)}""" + return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. \n{get_job_url( + project_id=job.job_id, + location=job.location, + job_id=job.job_id, + )}""" def get_formatted_time(val): diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 641bf52dc9..8a88a14040 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -145,6 +145,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None: timeout=None, metrics=None, query_with_job=True, + publisher=self._session._publisher, ) logger.info(f"Created bigframes function {query_job.ddl_target_routine}") diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 99b89131e7..242daf7525 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -219,7 +219,13 @@ def __call__(self, *args, **kwargs): args_string = ", ".join(map(bf_sql.simple_literal, args)) sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})" - iter, job = bf_io_bigquery.start_query_with_client(self._session.bqclient, sql=sql, query_with_job=True, job_config=bigquery.QueryJobConfig()) # type: ignore + iter, job = bf_io_bigquery.start_query_with_client( + self._session.bqclient, + sql=sql, + query_with_job=True, + job_config=bigquery.QueryJobConfig(), + publisher=self._session._publisher, + ) # type: ignore return list(iter.to_arrow().to_pydict().values())[0][0] @property @@ -297,7 +303,13 @@ def __call__(self, *args, **kwargs): args_string = ", ".join(map(bf_sql.simple_literal, args)) sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})" - iter, job = bf_io_bigquery.start_query_with_client(self._session.bqclient, sql=sql, query_with_job=True, job_config=bigquery.QueryJobConfig()) # type: ignore + iter, job = bf_io_bigquery.start_query_with_client( + self._session.bqclient, + sql=sql, + query_with_job=True, + job_config=bigquery.QueryJobConfig(), + publisher=self._session._publisher, + ) # type: ignore return list(iter.to_arrow().to_pydict().values())[0][0] @property diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 2ea10132bc..2455637b0a 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -291,6 +291,7 @@ def clean_up_by_session_id( session.bqclient, location=location, project=project, + publisher=session._publisher, ) bigframes.session._io.bigquery.delete_tables_matching_session_id( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f0cec864b4..6c90838b17 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -67,18 +67,14 @@ import bigframes.constants import bigframes.core from bigframes.core import blocks, log_adapter, utils +import bigframes.core.events import bigframes.core.pyformat - -# Even though the ibis.backends.bigquery import is unused, it's needed -# to register new and replacement ops with the Ibis BigQuery backend. +import bigframes.formatting_helpers import bigframes.functions._function_session as bff_session import bigframes.functions.function as bff from bigframes.session import bigquery_session, bq_caching_executor, executor import bigframes.session._io.bigquery as bf_io_bigquery -import bigframes.session.anonymous_dataset import bigframes.session.clients -import bigframes.session.loader -import bigframes.session.metrics import bigframes.session.validation # Avoid circular imports. @@ -140,6 +136,11 @@ def __init__( _warn_if_bf_version_is_obsolete() + # Publisher needs to be created before the other objects, especially + # the executors, because they access it. + self._publisher = bigframes.core.events.Publisher() + self._publisher.subscribe(bigframes.formatting_helpers.progress_callback) + if context is None: context = bigquery_options.BigQueryOptions() @@ -232,12 +233,14 @@ def __init__( location=self._location, session_id=self._session_id, kms_key=self._bq_kms_key_name, + publisher=self._publisher, ) # Session temp tables don't support specifying kms key, so use anon dataset if kms key specified self._session_resource_manager = ( bigquery_session.SessionResourceManager( self.bqclient, self._location, + publisher=self._publisher, ) if (self._bq_kms_key_name is None) else None @@ -254,6 +257,7 @@ def __init__( scan_index_uniqueness=self._strictly_ordered, force_total_order=self._strictly_ordered, metrics=self._metrics, + publisher=self._publisher, ) self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor( bqclient=self._clients_provider.bqclient, @@ -263,6 +267,7 @@ def __init__( strictly_ordered=self._strictly_ordered, metrics=self._metrics, enable_polars_execution=context.enable_polars_execution, + publisher=self._publisher, ) def __del__(self): @@ -373,10 +378,16 @@ def close(self): remote_function_session = getattr(self, "_function_session", None) if remote_function_session: - self._function_session.clean_up( + remote_function_session.clean_up( self.bqclient, self.cloudfunctionsclient, self.session_id ) + publisher_session = getattr(self, "_publisher", None) + if publisher_session: + publisher_session.publish( + bigframes.core.events.SessionClosed(self.session_id) + ) + @overload def read_gbq( # type: ignore[overload-overlap] self, @@ -2153,6 +2164,7 @@ def _start_query_ml_ddl( timeout=None, query_with_job=True, job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY, + publisher=self._publisher, ) return iterator, query_job @@ -2180,6 +2192,7 @@ def _create_object_table(self, path: str, connection: str) -> str: project=None, timeout=None, query_with_job=True, + publisher=self._publisher, ) return table diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 83f63e8b9a..aa56dc0040 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -29,11 +29,13 @@ import google.api_core.exceptions import google.api_core.retry import google.cloud.bigquery as bigquery +import google.cloud.bigquery._job_helpers +import google.cloud.bigquery.table from bigframes.core import log_adapter import bigframes.core.compile.googlesql as googlesql +import bigframes.core.events import bigframes.core.sql -import bigframes.formatting_helpers as formatting_helpers import bigframes.session.metrics CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." @@ -238,6 +240,24 @@ def add_and_trim_labels(job_config): ) +def create_bq_event_callback(publisher): + def publish_bq_event(event): + if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent): + bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event) + elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent): + bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event) + elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent): + bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event) + elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent): + bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event) + else: + bf_event = bigframes.core.events.BigQueryUnknownEvent(event) + + publisher.publish(bf_event) + + return publish_bq_event + + @overload def start_query_with_client( bq_client: bigquery.Client, @@ -249,7 +269,8 @@ def start_query_with_client( timeout: Optional[float], metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[True], -) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + publisher: bigframes.core.events.Publisher, +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @@ -264,7 +285,8 @@ def start_query_with_client( timeout: Optional[float], metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[False], -) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + publisher: bigframes.core.events.Publisher, +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @@ -280,7 +302,8 @@ def start_query_with_client( metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[True], job_retry: google.api_core.retry.Retry, -) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + publisher: bigframes.core.events.Publisher, +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @@ -296,7 +319,8 @@ def start_query_with_client( metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[False], job_retry: google.api_core.retry.Retry, -) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + publisher: bigframes.core.events.Publisher, +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @@ -315,23 +339,26 @@ def start_query_with_client( # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely # version 3.36.0 or later. job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, -) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + publisher: bigframes.core.events.Publisher, +) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts query job and waits for results. """ + # Note: Ensure no additional labels are added to job_config after this + # point, as `add_and_trim_labels` ensures the label count does not + # exceed MAX_LABELS_COUNT. + add_and_trim_labels(job_config) + try: - # Note: Ensure no additional labels are added to job_config after this - # point, as `add_and_trim_labels` ensures the label count does not - # exceed MAX_LABELS_COUNT. - add_and_trim_labels(job_config) if not query_with_job: - results_iterator = bq_client.query_and_wait( + results_iterator = bq_client._query_and_wait_bigframes( sql, job_config=job_config, location=location, project=project, api_timeout=timeout, job_retry=job_retry, + callback=create_bq_event_callback(publisher), ) if metrics is not None: metrics.count_job_stats(row_iterator=results_iterator) @@ -350,14 +377,32 @@ def start_query_with_client( ex.message += CHECK_DRIVE_PERMISSIONS raise - opts = bigframes.options.display - if opts.progress_bar is not None and not query_job.configuration.dry_run: - results_iterator = formatting_helpers.wait_for_query_job( - query_job, - progress_bar=opts.progress_bar, + if not query_job.configuration.dry_run: + publisher.publish( + bigframes.core.events.BigQuerySentEvent( + sql, + billing_project=query_job.project, + location=query_job.location, + job_id=query_job.job_id, + request_id=None, + ) + ) + results_iterator = query_job.result() + if not query_job.configuration.dry_run: + publisher.publish( + bigframes.core.events.BigQueryFinishedEvent( + billing_project=query_job.project, + location=query_job.location, + job_id=query_job.job_id, + destination=query_job.destination, + total_rows=results_iterator.total_rows, + total_bytes_processed=query_job.total_bytes_processed, + slot_millis=query_job.slot_millis, + created=query_job.created, + started=query_job.started, + ended=query_job.ended, + ) ) - else: - results_iterator = query_job.result() if metrics is not None: metrics.count_job_stats(query_job=query_job) @@ -399,6 +444,8 @@ def create_bq_dataset_reference( bq_client: bigquery.Client, location: Optional[str] = None, project: Optional[str] = None, + *, + publisher: bigframes.core.events.Publisher, ) -> bigquery.DatasetReference: """Create and identify dataset(s) for temporary BQ resources. @@ -430,6 +477,7 @@ def create_bq_dataset_reference( timeout=None, metrics=None, query_with_job=True, + publisher=publisher, ) # The anonymous dataset is used by BigQuery to write query results and diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 00531ce25d..f8a379aee9 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -26,8 +26,9 @@ import bigframes_vendored.constants as constants import google.api_core.exceptions import google.cloud.bigquery as bigquery +import google.cloud.bigquery.table -import bigframes.core.sql +import bigframes.core.events import bigframes.exceptions as bfe import bigframes.session._io.bigquery @@ -43,6 +44,7 @@ def get_table_metadata( *, cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], use_cache: bool = True, + publisher: bigframes.core.events.Publisher, ) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]: """Get the table metadata, either from cache or via REST API.""" @@ -59,6 +61,7 @@ def get_table_metadata( # Don't warn, because that will already have been taken care of. should_warn=False, should_dry_run=False, + publisher=publisher, ): # This warning should only happen if the cached snapshot_time will # have any effect on bigframes (b/437090788). For example, with @@ -101,13 +104,14 @@ def get_table_metadata( def is_time_travel_eligible( bqclient: bigquery.Client, - table: bigquery.table.Table, + table: google.cloud.bigquery.table.Table, columns: Optional[Sequence[str]], snapshot_time: datetime.datetime, filter_str: Optional[str] = None, *, should_warn: bool, should_dry_run: bool, + publisher: bigframes.core.events.Publisher, ): """Check if a table is eligible to use time-travel. @@ -184,6 +188,7 @@ def is_time_travel_eligible( timeout=None, metrics=None, query_with_job=False, + publisher=publisher, ) return True @@ -210,10 +215,8 @@ def is_time_travel_eligible( def infer_unique_columns( - bqclient: bigquery.Client, - table: bigquery.table.Table, + table: google.cloud.bigquery.table.Table, index_cols: List[str], - metadata_only: bool = False, ) -> Tuple[str, ...]: """Return a set of columns that can provide a unique row key or empty if none can be inferred. @@ -227,14 +230,37 @@ def infer_unique_columns( # Essentially, just reordering the primary key to match the index col order return tuple(index_col for index_col in index_cols if index_col in primary_keys) - if primary_keys or metadata_only or (not index_cols): - # Sometimes not worth scanning data to check uniqueness + if primary_keys: return primary_keys + + return () + + +def check_if_index_columns_are_unique( + bqclient: bigquery.Client, + table: google.cloud.bigquery.table.Table, + index_cols: List[str], + *, + publisher: bigframes.core.events.Publisher, +) -> Tuple[str, ...]: + import bigframes.core.sql + import bigframes.session._io.bigquery + # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring # table_expression only selects just index_cols. is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference) job_config = bigquery.QueryJobConfig() - results = bqclient.query_and_wait(is_unique_sql, job_config=job_config) + results, _ = bigframes.session._io.bigquery.start_query_with_client( + bq_client=bqclient, + sql=is_unique_sql, + job_config=job_config, + timeout=None, + location=None, + project=None, + metrics=None, + query_with_job=False, + publisher=publisher, + ) row = next(iter(results)) if row["total_count"] == row["distinct_count"]: @@ -243,7 +269,7 @@ def infer_unique_columns( def _get_primary_keys( - table: bigquery.table.Table, + table: google.cloud.bigquery.table.Table, ) -> List[str]: """Get primary keys from table if they are set.""" @@ -261,7 +287,7 @@ def _get_primary_keys( def _is_table_clustered_or_partitioned( - table: bigquery.table.Table, + table: google.cloud.bigquery.table.Table, ) -> bool: """Returns True if the table is clustered or partitioned.""" @@ -284,7 +310,7 @@ def _is_table_clustered_or_partitioned( def get_index_cols( - table: bigquery.table.Table, + table: google.cloud.bigquery.table.Table, index_col: Iterable[str] | str | Iterable[int] diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py index ec624d4eb4..3c1757806b 100644 --- a/bigframes/session/anonymous_dataset.py +++ b/bigframes/session/anonymous_dataset.py @@ -20,6 +20,7 @@ import google.cloud.bigquery as bigquery from bigframes import constants +import bigframes.core.events from bigframes.session import temporary_storage import bigframes.session._io.bigquery as bf_io_bigquery @@ -37,10 +38,12 @@ def __init__( location: str, session_id: str, *, - kms_key: Optional[str] = None + kms_key: Optional[str] = None, + publisher: bigframes.core.events.Publisher, ): self.bqclient = bqclient self._location = location + self._publisher = publisher self.session_id = session_id self._table_ids: List[bigquery.TableReference] = [] @@ -62,6 +65,7 @@ def dataset(self) -> bigquery.DatasetReference: self._datset_ref = bf_io_bigquery.create_bq_dataset_reference( self.bqclient, location=self._location, + publisher=self._publisher, ) return self._datset_ref diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index 883087df07..99c13007d8 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import datetime import logging import threading @@ -23,7 +25,9 @@ import google.cloud.bigquery as bigquery from bigframes.core.compile import googlesql +import bigframes.core.events from bigframes.session import temporary_storage +import bigframes.session._io.bigquery as bfbqio KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0 @@ -38,12 +42,19 @@ class SessionResourceManager(temporary_storage.TemporaryStorageManager): Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session. """ - def __init__(self, bqclient: bigquery.Client, location: str): + def __init__( + self, + bqclient: bigquery.Client, + location: str, + *, + publisher: bigframes.core.events.Publisher, + ): self.bqclient = bqclient self._location = location self._session_id: Optional[str] = None self._sessiondaemon: Optional[RecurringTaskDaemon] = None self._session_lock = threading.RLock() + self._publisher = publisher @property def location(self): @@ -84,21 +95,38 @@ def create_temp_table( ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}" - job = self.bqclient.query( - ddl, job_config=job_config, location=self.location + _, job = bfbqio.start_query_with_client( + self.bqclient, + ddl, + job_config=job_config, + location=self.location, + project=None, + timeout=None, + metrics=None, + query_with_job=True, + publisher=self._publisher, ) job.result() # return the fully qualified table, so it can be used outside of the session - return job.destination + destination = job.destination + assert destination is not None, "Failure to create temp table." + return destination def close(self): if self._sessiondaemon is not None: self._sessiondaemon.stop() if self._session_id is not None and self.bqclient is not None: - self.bqclient.query_and_wait( + bfbqio.start_query_with_client( + self.bqclient, f"CALL BQ.ABORT_SESSION('{self._session_id}')", + job_config=bigquery.QueryJobConfig(), location=self.location, + project=None, + timeout=None, + metrics=None, + query_with_job=False, + publisher=self._publisher, ) def _get_session_id(self) -> str: @@ -109,8 +137,16 @@ def _get_session_id(self) -> str: job_config = bigquery.QueryJobConfig(create_session=True) # Make sure the session is a new one, not one associated with another query. job_config.use_query_cache = False - query_job = self.bqclient.query( - "SELECT 1", job_config=job_config, location=self.location + _, query_job = bfbqio.start_query_with_client( + self.bqclient, + "SELECT 1", + job_config=job_config, + location=self.location, + project=None, + timeout=None, + metrics=None, + query_with_job=True, + publisher=self._publisher, ) query_job.result() # blocks until finished assert query_job.session_info is not None @@ -133,11 +169,16 @@ def _keep_session_alive(self): ] ) try: - self.bqclient.query_and_wait( + bfbqio.start_query_with_client( + self.bqclient, "SELECT 1", - location=self.location, job_config=job_config, - wait_timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS, + location=self.location, + project=None, + timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS, + metrics=None, + query_with_job=False, + publisher=self._publisher, ) except Exception as e: logging.warning("BigQuery session keep-alive query errored : %s", e) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index cbda9bc640..d4cfa13aa4 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -33,6 +33,7 @@ import bigframes.core from bigframes.core import compile, local_data, rewrite import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir +import bigframes.core.events import bigframes.core.guid import bigframes.core.identifiers import bigframes.core.nodes as nodes @@ -140,6 +141,7 @@ def __init__( strictly_ordered: bool = True, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, enable_polars_execution: bool = False, + publisher: bigframes.core.events.Publisher, ): self.bqclient = bqclient self.storage_manager = storage_manager @@ -149,6 +151,9 @@ def __init__( self.loader = loader self.bqstoragereadclient = bqstoragereadclient self._enable_polars_execution = enable_polars_execution + self._publisher = publisher + + # TODO(tswast): Send events from semi-executors, too. self._semi_executors: Sequence[semi_executor.SemiExecutor] = ( read_api_execution.ReadApiSemiExecutor( bqstoragereadclient=bqstoragereadclient, @@ -188,6 +193,8 @@ def execute( array_value: bigframes.core.ArrayValue, execution_spec: ex_spec.ExecutionSpec, ) -> executor.ExecuteResult: + self._publisher.publish(bigframes.core.events.ExecutionStarted()) + # TODO: Support export jobs in combination with semi executors if execution_spec.destination_spec is None: plan = self.prepare_plan(array_value.node, target="simplify") @@ -196,6 +203,11 @@ def execute( plan, ordered=execution_spec.ordered, peek=execution_spec.peek ) if maybe_result: + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=maybe_result, + ) + ) return maybe_result if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): @@ -204,7 +216,13 @@ def execute( "Ordering and peeking not supported for gbq export" ) # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml - return self._export_gbq(array_value, execution_spec.destination_spec) + result = self._export_gbq(array_value, execution_spec.destination_spec) + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=result, + ) + ) + return result result = self._execute_plan_gbq( array_value.node, @@ -219,6 +237,11 @@ def execute( if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): self._export_result_gcs(result, execution_spec.destination_spec) + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=result, + ) + ) return result def _export_result_gcs( @@ -243,6 +266,7 @@ def _export_result_gcs( location=None, timeout=None, query_with_job=True, + publisher=self._publisher, ) def _maybe_find_existing_table( @@ -404,6 +428,7 @@ def _run_execute_query( location=None, timeout=None, query_with_job=True, + publisher=self._publisher, ) else: return bq_io.start_query_with_client( @@ -415,6 +440,7 @@ def _run_execute_query( location=None, timeout=None, query_with_job=False, + publisher=self._publisher, ) except google.api_core.exceptions.BadRequest as e: diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py index 7538c9300f..9e7db87301 100644 --- a/bigframes/session/direct_gbq_execution.py +++ b/bigframes/session/direct_gbq_execution.py @@ -21,6 +21,7 @@ from bigframes.core import compile, nodes from bigframes.core.compile import sqlglot +import bigframes.core.events from bigframes.session import executor, semi_executor import bigframes.session._io.bigquery as bq_io @@ -31,7 +32,11 @@ # reference for validating more complex executors. class DirectGbqExecutor(semi_executor.SemiExecutor): def __init__( - self, bqclient: bigquery.Client, compiler: Literal["ibis", "sqlglot"] = "ibis" + self, + bqclient: bigquery.Client, + compiler: Literal["ibis", "sqlglot"] = "ibis", + *, + publisher: bigframes.core.events.Publisher, ): self.bqclient = bqclient self._compile_fn = ( @@ -39,6 +44,7 @@ def __init__( if compiler == "ibis" else sqlglot.SQLGlotCompiler()._compile_sql ) + self._publisher = publisher def execute( self, @@ -83,4 +89,5 @@ def _run_execute_query( timeout=None, metrics=None, query_with_job=False, + publisher=self._publisher, ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 94d8db6f36..940fdc1352 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -50,6 +50,7 @@ from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils import bigframes.core as core import bigframes.core.blocks as blocks +import bigframes.core.events import bigframes.core.schema as schemata import bigframes.dtypes import bigframes.formatting_helpers as formatting_helpers @@ -262,6 +263,8 @@ def __init__( scan_index_uniqueness: bool, force_total_order: bool, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + *, + publisher: bigframes.core.events.Publisher, ): self._bqclient = bqclient self._write_client = write_client @@ -273,6 +276,7 @@ def __init__( bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table] ] = {} self._metrics = metrics + self._publisher = publisher # Unfortunate circular reference, but need to pass reference when constructing objects self._session = session self._clock = session_time.BigQuerySyncedClock(bqclient) @@ -499,6 +503,7 @@ def read_gbq_table( # type: ignore[overload-overlap] force_total_order: Optional[bool] = ..., n_rows: Optional[int] = None, index_col_in_columns: bool = False, + publish_execution: bool = True, ) -> dataframe.DataFrame: ... @@ -522,6 +527,7 @@ def read_gbq_table( force_total_order: Optional[bool] = ..., n_rows: Optional[int] = None, index_col_in_columns: bool = False, + publish_execution: bool = True, ) -> pandas.Series: ... @@ -544,6 +550,7 @@ def read_gbq_table( force_total_order: Optional[bool] = None, n_rows: Optional[int] = None, index_col_in_columns: bool = False, + publish_execution: bool = True, ) -> dataframe.DataFrame | pandas.Series: """Read a BigQuery table into a BigQuery DataFrames DataFrame. @@ -603,8 +610,12 @@ def read_gbq_table( when the index is selected from the data columns (e.g., in a ``read_csv`` scenario). The column will be used as the DataFrame's index and removed from the list of value columns. + publish_execution (bool, optional): + If True, sends an execution started and stopped event if this + causes a query. Set to False if using read_gbq_table from + another function that is reporting execution. """ - import bigframes._tools.strings + import bigframes.core.events import bigframes.dataframe as dataframe # --------------------------------- @@ -636,6 +647,7 @@ def read_gbq_table( bq_time=self._clock.get_time(), cache=self._df_snapshot, use_cache=use_cache, + publisher=self._publisher, ) if table.location.casefold() != self._storage_manager.location.casefold(): @@ -756,6 +768,7 @@ def read_gbq_table( filter_str, should_warn=True, should_dry_run=True, + publisher=self._publisher, ) # ---------------------------- @@ -768,12 +781,27 @@ def read_gbq_table( # TODO(b/338065601): Provide a way to assume uniqueness and avoid this # check. primary_key = bf_read_gbq_table.infer_unique_columns( - bqclient=self._bqclient, table=table, index_cols=index_cols, - # If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique - metadata_only=not self._scan_index_uniqueness, ) + + # If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique + if not primary_key and self._scan_index_uniqueness and index_cols: + if publish_execution: + self._publisher.publish( + bigframes.core.events.ExecutionStarted(), + ) + primary_key = bf_read_gbq_table.check_if_index_columns_are_unique( + self._bqclient, + table=table, + index_cols=index_cols, + publisher=self._publisher, + ) + if publish_execution: + self._publisher.publish( + bigframes.core.events.ExecutionFinished(), + ) + schema = schemata.ArraySchema.from_bq_table(table) if not include_all_columns: schema = schema.select(index_cols + columns) @@ -991,6 +1019,12 @@ def read_gbq_query( query_job, list(columns), index_cols ) + # We want to make sure we show progress when we actually do execute a + # query. Since we have got this far, we know it's not a dry run. + self._publisher.publish( + bigframes.core.events.ExecutionStarted(), + ) + query_job_for_metrics: Optional[bigquery.QueryJob] = None destination: Optional[bigquery.TableReference] = None @@ -1046,20 +1080,28 @@ def read_gbq_query( # makes sense to download the results beyond the first page, even if # there is a job and destination table available. if query_job_for_metrics is None and rows is not None: - return bf_read_gbq_query.create_dataframe_from_row_iterator( + df = bf_read_gbq_query.create_dataframe_from_row_iterator( rows, session=self._session, index_col=index_col, columns=columns, ) + self._publisher.publish( + bigframes.core.events.ExecutionFinished(), + ) + return df # We already checked rows, so if there's no destination table, then # there are no results to return. if destination is None: - return bf_read_gbq_query.create_dataframe_from_query_job_stats( + df = bf_read_gbq_query.create_dataframe_from_query_job_stats( query_job_for_metrics, session=self._session, ) + self._publisher.publish( + bigframes.core.events.ExecutionFinished(), + ) + return df # If the query was DDL or DML, return some job metadata. See # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type @@ -1070,10 +1112,14 @@ def read_gbq_query( query_job_for_metrics is not None and not bf_read_gbq_query.should_return_query_results(query_job_for_metrics) ): - return bf_read_gbq_query.create_dataframe_from_query_job_stats( + df = bf_read_gbq_query.create_dataframe_from_query_job_stats( query_job_for_metrics, session=self._session, ) + self._publisher.publish( + bigframes.core.events.ExecutionFinished(), + ) + return df # Speed up counts by getting counts from result metadata. if rows is not None: @@ -1083,16 +1129,21 @@ def read_gbq_query( else: n_rows = None - return self.read_gbq_table( + df = self.read_gbq_table( f"{destination.project}.{destination.dataset_id}.{destination.table_id}", index_col=index_col, columns=columns, use_cache=configuration["query"]["useQueryCache"], force_total_order=force_total_order, n_rows=n_rows, + publish_execution=False, # max_results and filters are omitted because they are already # handled by to_query(), above. ) + self._publisher.publish( + bigframes.core.events.ExecutionFinished(), + ) + return df def _query_to_destination( self, @@ -1194,6 +1245,7 @@ def _start_query_with_job_optional( project=None, metrics=None, query_with_job=False, + publisher=self._publisher, ) return rows @@ -1219,6 +1271,7 @@ def _start_query_with_job( project=None, metrics=None, query_with_job=True, + publisher=self._publisher, ) return query_job diff --git a/bigframes/testing/mocks.py b/bigframes/testing/mocks.py index 8d9997b1df..ff210419fd 100644 --- a/bigframes/testing/mocks.py +++ b/bigframes/testing/mocks.py @@ -143,6 +143,7 @@ def query_and_wait_mock(query, *args, job_config=None, **kwargs): bqclient.query.side_effect = query_mock bqclient.query_and_wait.side_effect = query_and_wait_mock + bqclient._query_and_wait_bigframes.side_effect = query_and_wait_mock clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) diff --git a/setup.py b/setup.py index 2aef514749..abc760b691 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ "gcsfs >=2023.3.0, !=2025.5.0", "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0", - "google-cloud-bigquery[bqstorage,pandas] >=3.31.0", + "google-cloud-bigquery[bqstorage,pandas] >=3.36.0", # 2.30 needed for arrow support. "google-cloud-bigquery-storage >= 2.30.0, < 3.0.0", "google-cloud-functions >=1.12.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 8df3a3a2c3..eceec07dc4 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -6,7 +6,7 @@ geopandas==0.12.2 google-auth==2.15.0 google-cloud-bigtable==2.24.0 google-cloud-pubsub==2.21.4 -google-cloud-bigquery==3.31.0 +google-cloud-bigquery==3.36.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 google-cloud-iam==2.12.1 diff --git a/tests/system/small/engines/conftest.py b/tests/system/small/engines/conftest.py index 9699cc6a61..a775731cde 100644 --- a/tests/system/small/engines/conftest.py +++ b/tests/system/small/engines/conftest.py @@ -19,7 +19,7 @@ import pytest import bigframes -from bigframes.core import ArrayValue, local_data +from bigframes.core import ArrayValue, events, local_data from bigframes.session import ( direct_gbq_execution, local_scan_executor, @@ -50,11 +50,14 @@ def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecu return local_scan_executor.LocalScanExecutor() if request.param == "polars": return polars_executor.PolarsExecutor() + publisher = events.Publisher() if request.param == "bq": - return direct_gbq_execution.DirectGbqExecutor(bigquery_client) + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, publisher=publisher + ) if request.param == "bq-sqlglot": return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot" + bigquery_client, compiler="sqlglot", publisher=publisher ) raise ValueError(f"Unrecognized param: {request.param}") diff --git a/tests/system/small/engines/test_aggregation.py b/tests/system/small/engines/test_aggregation.py index 9b4efe8cbe..4225d5dff7 100644 --- a/tests/system/small/engines/test_aggregation.py +++ b/tests/system/small/engines/test_aggregation.py @@ -15,7 +15,14 @@ from google.cloud import bigquery import pytest -from bigframes.core import agg_expressions, array_value, expression, identifiers, nodes +from bigframes.core import ( + agg_expressions, + array_value, + events, + expression, + identifiers, + nodes, +) import bigframes.operations.aggregations as agg_ops from bigframes.session import direct_gbq_execution, polars_executor from bigframes.testing.engine_utils import assert_equivalence_execution @@ -93,9 +100,12 @@ def test_sql_engines_median_op_aggregates( scalars_array_value, agg_ops.MedianOp(), ).node - left_engine = direct_gbq_execution.DirectGbqExecutor(bigquery_client) + publisher = events.Publisher() + left_engine = direct_gbq_execution.DirectGbqExecutor( + bigquery_client, publisher=publisher + ) right_engine = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot" + bigquery_client, compiler="sqlglot", publisher=publisher ) assert_equivalence_execution(node, left_engine, right_engine) diff --git a/tests/system/small/engines/test_windowing.py b/tests/system/small/engines/test_windowing.py index f344a3b60a..a34d7b8f38 100644 --- a/tests/system/small/engines/test_windowing.py +++ b/tests/system/small/engines/test_windowing.py @@ -18,6 +18,7 @@ from bigframes.core import ( agg_expressions, array_value, + events, expression, identifiers, nodes, @@ -64,8 +65,11 @@ def test_engines_with_rows_window( skip_reproject_unsafe=False, ) - bq_executor = direct_gbq_execution.DirectGbqExecutor(bigquery_client) + publisher = events.Publisher() + bq_executor = direct_gbq_execution.DirectGbqExecutor( + bigquery_client, publisher=publisher + ) bq_sqlgot_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot" + bigquery_client, compiler="sqlglot", publisher=publisher ) assert_equivalence_execution(window_node, bq_executor, bq_sqlgot_executor) diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 28fab19144..f81bdf8931 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -27,6 +27,7 @@ import bigframes import bigframes.clients +import bigframes.core.events import bigframes.dtypes import bigframes.exceptions from bigframes.functions import _utils as bff_utils @@ -769,6 +770,7 @@ def test_read_gbq_function_runs_existing_udf_array_output(session, routine_id_un timeout=None, metrics=None, query_with_job=True, + publisher=bigframes.core.events.Publisher(), ) func = session.read_gbq_function(routine_id_unique) @@ -807,6 +809,7 @@ def test_read_gbq_function_runs_existing_udf_2_params_array_output( timeout=None, metrics=None, query_with_job=True, + publisher=bigframes.core.events.Publisher(), ) func = session.read_gbq_function(routine_id_unique) @@ -847,6 +850,7 @@ def test_read_gbq_function_runs_existing_udf_4_params_array_output( timeout=None, metrics=None, query_with_job=True, + publisher=bigframes.core.events.Publisher(), ) func = session.read_gbq_function(routine_id_unique) diff --git a/tests/system/small/test_bq_sessions.py b/tests/system/small/test_bq_sessions.py index 7aad19bd8f..801346600d 100644 --- a/tests/system/small/test_bq_sessions.py +++ b/tests/system/small/test_bq_sessions.py @@ -17,10 +17,10 @@ import google import google.api_core.exceptions -import google.cloud from google.cloud import bigquery import pytest +import bigframes.core.events from bigframes.session import bigquery_session TEST_SCHEMA = [ @@ -39,12 +39,14 @@ def session_resource_manager( bigquery_client, ) -> bigquery_session.SessionResourceManager: - return bigquery_session.SessionResourceManager(bigquery_client, "US") + return bigquery_session.SessionResourceManager( + bigquery_client, "US", publisher=bigframes.core.events.Publisher() + ) def test_bq_session_create_temp_table_clustered(bigquery_client: bigquery.Client): session_resource_manager = bigquery_session.SessionResourceManager( - bigquery_client, "US" + bigquery_client, "US", publisher=bigframes.core.events.Publisher() ) cluster_cols = ["string field", "bool field"] @@ -68,7 +70,7 @@ def test_bq_session_create_temp_table_clustered(bigquery_client: bigquery.Client def test_bq_session_create_multi_temp_tables(bigquery_client: bigquery.Client): session_resource_manager = bigquery_session.SessionResourceManager( - bigquery_client, "US" + bigquery_client, "US", publisher=bigframes.core.events.Publisher() ) def create_table(): diff --git a/tests/system/small/test_progress_bar.py b/tests/system/small/test_progress_bar.py index 8a323831b5..0c9c4070f4 100644 --- a/tests/system/small/test_progress_bar.py +++ b/tests/system/small/test_progress_bar.py @@ -23,7 +23,7 @@ import bigframes.formatting_helpers as formatting_helpers from bigframes.session import MAX_INLINE_DF_BYTES -job_load_message_regex = r"\w+ job [\w-]+ is \w+\." +job_load_message_regex = r"Query" EXPECTED_DRY_RUN_MESSAGE = "Computation deferred. Computation will process" @@ -56,7 +56,7 @@ def test_progress_bar_scalar(penguins_df_default_index: bf.dataframe.DataFrame, with bf.option_context("display.progress_bar", "terminal"): penguins_df_default_index["body_mass_g"].head(10).mean() - assert capsys.readouterr().out == "" + assert_loading_msg_exist(capsys.readouterr().out) def test_progress_bar_scalar_allow_large_results( @@ -100,37 +100,19 @@ def test_progress_bar_load_jobs( capsys.readouterr() # clear output session.read_csv(path) - assert_loading_msg_exist(capsys.readouterr().out) + assert_loading_msg_exist(capsys.readouterr().out, pattern="Load") -def assert_loading_msg_exist(capystOut: str, pattern=job_load_message_regex): - numLoadingMsg = 0 - lines = capystOut.split("\n") +def assert_loading_msg_exist(capstdout: str, pattern=job_load_message_regex): + num_loading_msg = 0 + lines = capstdout.split("\n") lines = [line for line in lines if len(line) > 0] assert len(lines) > 0 for line in lines: - if re.match(pattern, line) is not None: - numLoadingMsg += 1 - assert numLoadingMsg > 0 - - -def test_query_job_repr_html(penguins_df_default_index: bf.dataframe.DataFrame): - with bf.option_context("display.progress_bar", "terminal"): - penguins_df_default_index.to_pandas(allow_large_results=True) - query_job_repr = formatting_helpers.repr_query_job_html( - penguins_df_default_index.query_job - ).value - - string_checks = [ - "Job Id", - "Destination Table", - "Slot Time", - "Bytes Processed", - "Cache hit", - ] - for string in string_checks: - assert string in query_job_repr + if re.search(pattern, line) is not None: + num_loading_msg += 1 + assert num_loading_msg > 0 def test_query_job_repr(penguins_df_default_index: bf.dataframe.DataFrame): diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index c451d74d0f..57ac3d88f7 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -22,6 +22,7 @@ import bigframes from bigframes.core import log_adapter +import bigframes.core.events import bigframes.pandas as bpd import bigframes.session._io.bigquery as io_bq from bigframes.testing import mocks @@ -236,6 +237,7 @@ def test_start_query_with_client_labels_length_limit_met( timeout=timeout, metrics=None, query_with_job=True, + publisher=bigframes.core.events.Publisher(), ) assert job_config.labels is not None diff --git a/tests/unit/session/test_read_gbq_table.py b/tests/unit/session/test_read_gbq_table.py index 0c67e05813..d21f0000a9 100644 --- a/tests/unit/session/test_read_gbq_table.py +++ b/tests/unit/session/test_read_gbq_table.py @@ -24,13 +24,12 @@ @pytest.mark.parametrize( - ("index_cols", "primary_keys", "values_distinct", "expected"), + ("index_cols", "primary_keys", "expected"), ( - (["col1", "col2"], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")), + (["col1", "col2"], ["col1", "col2", "col3"], ("col1", "col2", "col3")), ( ["col1", "col2", "col3"], ["col1", "col2", "col3"], - True, ("col1", "col2", "col3"), ), ( @@ -39,15 +38,14 @@ "col3", "col2", ], - True, ("col2", "col3"), ), - (["col1", "col2"], [], False, ()), - ([], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")), - ([], [], False, ()), + (["col1", "col2"], [], ()), + ([], ["col1", "col2", "col3"], ("col1", "col2", "col3")), + ([], [], ()), ), ) -def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expected): +def test_infer_unique_columns(index_cols, primary_keys, expected): """If a primary key is set on the table, we use that as the index column by default, no error should be raised in this case. @@ -79,6 +77,49 @@ def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expecte "columns": primary_keys, }, } + + result = bf_read_gbq_table.infer_unique_columns(table, index_cols) + + assert result == expected + + +@pytest.mark.parametrize( + ("index_cols", "values_distinct", "expected"), + ( + ( + ["col1", "col2", "col3"], + True, + ("col1", "col2", "col3"), + ), + ( + ["col2", "col3", "col1"], + True, + ("col2", "col3", "col1"), + ), + (["col1", "col2"], False, ()), + ([], False, ()), + ), +) +def test_check_if_index_columns_are_unique(index_cols, values_distinct, expected): + table = google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": { + "projectId": "my-project", + "datasetId": "my_dataset", + "tableId": "my_table", + }, + "clustering": { + "fields": ["col1", "col2"], + }, + }, + ) + table.schema = ( + google.cloud.bigquery.SchemaField("col1", "INT64"), + google.cloud.bigquery.SchemaField("col2", "INT64"), + google.cloud.bigquery.SchemaField("col3", "INT64"), + google.cloud.bigquery.SchemaField("col4", "INT64"), + ) + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" session = mocks.create_bigquery_session( @@ -87,13 +128,18 @@ def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expecte # Mock bqclient _after_ creating session to override its mocks. bqclient.get_table.return_value = table - bqclient.query_and_wait.side_effect = None - bqclient.query_and_wait.return_value = ( + bqclient._query_and_wait_bigframes.side_effect = None + bqclient._query_and_wait_bigframes.return_value = ( {"total_count": 3, "distinct_count": 3 if values_distinct else 2}, ) table._properties["location"] = session._location - result = bf_read_gbq_table.infer_unique_columns(bqclient, table, index_cols) + result = bf_read_gbq_table.check_if_index_columns_are_unique( + bqclient=bqclient, + table=table, + index_cols=index_cols, + publisher=session._publisher, + ) assert result == expected diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 63c82eb30f..d05957b941 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -247,7 +247,7 @@ def test_read_gbq_cached_table(): table, ) - session.bqclient.query_and_wait = mock.MagicMock( + session.bqclient._query_and_wait_bigframes = mock.MagicMock( return_value=({"total_count": 3, "distinct_count": 2},) ) session.bqclient.get_table.return_value = table @@ -278,7 +278,7 @@ def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_inclu table, ) - session.bqclient.query_and_wait = mock.MagicMock( + session.bqclient._query_and_wait_bigframes = mock.MagicMock( return_value=({"total_count": 3, "distinct_count": 2},) ) session.bqclient.get_table.return_value = table @@ -306,7 +306,9 @@ def test_default_index_warning_raised_by_read_gbq(table): bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table - bqclient.query_and_wait.return_value = ({"total_count": 3, "distinct_count": 2},) + bqclient._query_and_wait_bigframes.return_value = ( + {"total_count": 3, "distinct_count": 2}, + ) session = mocks.create_bigquery_session( bqclient=bqclient, # DefaultIndexWarning is only relevant for strict mode. @@ -333,7 +335,9 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64 bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table - bqclient.query_and_wait.return_value = ({"total_count": 4, "distinct_count": 3},) + bqclient._query_and_wait_bigframes.return_value = ( + {"total_count": 4, "distinct_count": 3}, + ) session = mocks.create_bigquery_session( bqclient=bqclient, # DefaultIndexWarning is only relevant for strict mode. @@ -382,7 +386,7 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_columns( bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table - bqclient.query_and_wait.return_value = ( + bqclient._query_and_wait_bigframes.return_value = ( {"total_count": total_count, "distinct_count": distinct_count}, ) session = mocks.create_bigquery_session( @@ -492,6 +496,7 @@ def query_mock(query, *args, **kwargs): return session_query_mock(query, *args, **kwargs) session.bqclient.query_and_wait = query_mock + session.bqclient._query_and_wait_bigframes = query_mock def get_table_mock(table_ref): table = google.cloud.bigquery.Table( diff --git a/tests/unit/test_formatting_helpers.py b/tests/unit/test_formatting_helpers.py index 588ef6e824..9dc1379496 100644 --- a/tests/unit/test_formatting_helpers.py +++ b/tests/unit/test_formatting_helpers.py @@ -19,6 +19,7 @@ import google.cloud.bigquery as bigquery import pytest +import bigframes.core.events as bfevents import bigframes.formatting_helpers as formatting_helpers import bigframes.version @@ -30,7 +31,7 @@ def test_wait_for_query_job_error_includes_feedback_link(): ) with pytest.raises(api_core_exceptions.BadRequest) as cap_exc: - formatting_helpers.wait_for_query_job(mock_query_job) + formatting_helpers.wait_for_job(mock_query_job) cap_exc.match("Test message 123.") cap_exc.match(constants.FEEDBACK_LINK) @@ -70,3 +71,129 @@ def test_get_formatted_bytes(test_input, expected): ) def test_get_formatted_time(test_input, expected): assert formatting_helpers.get_formatted_time(test_input) == expected + + +def test_render_bqquery_sent_event_html(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + ) + html = formatting_helpers.render_bqquery_sent_event_html(event) + assert "SELECT * FROM my_table" in html + assert "my-job-id" in html + assert "us-central1" in html + assert "my-project" in html + assert "
" in html + + +def test_render_bqquery_sent_event_plaintext(): + event = bfevents.BigQuerySentEvent( + query="SELECT * FROM my_table", + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + ) + text = formatting_helpers.render_bqquery_sent_event_plaintext(event) + assert "my-job-id" in text + assert "us-central1" in text + assert "my-project" in text + assert "SELECT * FROM my_table" not in text + + +def test_render_bqquery_retry_event_html(): + event = bfevents.BigQueryRetryEvent( + query="SELECT * FROM my_table", + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + ) + html = formatting_helpers.render_bqquery_retry_event_html(event) + assert "Retrying query" in html + assert "SELECT * FROM my_table" in html + assert "my-job-id" in html + assert "us-central1" in html + assert "my-project" in html + assert "
" in html + + +def test_render_bqquery_retry_event_plaintext(): + event = bfevents.BigQueryRetryEvent( + query="SELECT * FROM my_table", + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + ) + text = formatting_helpers.render_bqquery_retry_event_plaintext(event) + assert "Retrying query" in text + assert "my-job-id" in text + assert "us-central1" in text + assert "my-project" in text + assert "SELECT * FROM my_table" not in text + + +def test_render_bqquery_received_event_html(): + mock_plan_entry = mock.create_autospec( + bigquery.job.query.QueryPlanEntry, instance=True + ) + mock_plan_entry.__str__.return_value = "mocked plan" + event = bfevents.BigQueryReceivedEvent( + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + state="RUNNING", + query_plan=[mock_plan_entry], + ) + html = formatting_helpers.render_bqquery_received_event_html(event) + assert "Query" in html + assert "my-job-id" in html + assert "is RUNNING" in html + assert "
" in html + assert "mocked plan" in html + + +def test_render_bqquery_received_event_plaintext(): + event = bfevents.BigQueryReceivedEvent( + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + state="RUNNING", + query_plan=[], + ) + text = formatting_helpers.render_bqquery_received_event_plaintext(event) + assert "Query" in text + assert "my-job-id" in text + assert "is RUNNING" in text + assert "Query Plan" not in text + + +def test_render_bqquery_finished_event_html(): + event = bfevents.BigQueryFinishedEvent( + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + total_bytes_processed=1000, + slot_millis=2000, + ) + html = formatting_helpers.render_bqquery_finished_event_html(event) + assert "Query" in html + assert "my-job-id" in html + assert "processed 1.0 kB" in html + assert "2 seconds of slot time" in html + + +def test_render_bqquery_finished_event_plaintext(): + event = bfevents.BigQueryFinishedEvent( + job_id="my-job-id", + location="us-central1", + billing_project="my-project", + total_bytes_processed=1000, + slot_millis=2000, + ) + text = formatting_helpers.render_bqquery_finished_event_plaintext(event) + assert "Query" in text + assert "my-job-id" in text + assert "finished" in text + assert "1.0 kB processed" in text + assert "Slot time: 2 seconds" in text