diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py
index 8d1ca38e62..8dd9328fb8 100644
--- a/bigframes/blob/_functions.py
+++ b/bigframes/blob/_functions.py
@@ -99,6 +99,7 @@ def _create_udf(self):
project=None,
timeout=None,
query_with_job=True,
+ publisher=self._session._publisher,
)
return udf_name
diff --git a/bigframes/core/events.py b/bigframes/core/events.py
new file mode 100644
index 0000000000..d0e5f7ad69
--- /dev/null
+++ b/bigframes/core/events.py
@@ -0,0 +1,237 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+import dataclasses
+import datetime
+import threading
+from typing import Any, Callable, Optional, Set
+import uuid
+
+import google.cloud.bigquery._job_helpers
+import google.cloud.bigquery.job.query
+import google.cloud.bigquery.table
+
+import bigframes.session.executor
+
+
+class Subscriber:
+ def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
+ self._publisher = publisher
+ self._callback = callback
+ self._subscriber_id = uuid.uuid4()
+
+ def __call__(self, *args, **kwargs):
+ return self._callback(*args, **kwargs)
+
+ def __hash__(self) -> int:
+ return hash(self._subscriber_id)
+
+ def __eq__(self, value: object):
+ if not isinstance(value, Subscriber):
+ return NotImplemented
+ return value._subscriber_id == self._subscriber_id
+
+ def close(self):
+ self._publisher.unsubscribe(self)
+ del self._publisher
+ del self._callback
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_value is not None:
+ self(
+ UnknownErrorEvent(
+ exc_type=exc_type,
+ exc_value=exc_value,
+ traceback=traceback,
+ )
+ )
+ self.close()
+
+
+class Publisher:
+ def __init__(self):
+ self._subscribers_lock = threading.Lock()
+ self._subscribers: Set[Subscriber] = set()
+
+ def subscribe(self, callback: Callable[[Event], None]) -> Subscriber:
+ # TODO(b/448176657): figure out how to handle subscribers/publishers in
+ # a background thread. Maybe subscribers should be thread-local?
+ subscriber = Subscriber(callback, publisher=self)
+ with self._subscribers_lock:
+ self._subscribers.add(subscriber)
+ return subscriber
+
+ def unsubscribe(self, subscriber: Subscriber):
+ with self._subscribers_lock:
+ self._subscribers.remove(subscriber)
+
+ def publish(self, event: Event):
+ with self._subscribers_lock:
+ for subscriber in self._subscribers:
+ subscriber(event)
+
+
+class Event:
+ pass
+
+
+@dataclasses.dataclass(frozen=True)
+class SessionClosed(Event):
+ session_id: str
+
+
+class ExecutionStarted(Event):
+ pass
+
+
+class ExecutionRunning(Event):
+ pass
+
+
+@dataclasses.dataclass(frozen=True)
+class ExecutionFinished(Event):
+ result: Optional[bigframes.session.executor.ExecuteResult] = None
+
+
+@dataclasses.dataclass(frozen=True)
+class UnknownErrorEvent(Event):
+ exc_type: Any
+ exc_value: Any
+ traceback: Any
+
+
+@dataclasses.dataclass(frozen=True)
+class BigQuerySentEvent(ExecutionRunning):
+ """Query sent to BigQuery."""
+
+ query: str
+ billing_project: Optional[str] = None
+ location: Optional[str] = None
+ job_id: Optional[str] = None
+ request_id: Optional[str] = None
+
+ @classmethod
+ def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
+ return cls(
+ query=event.query,
+ billing_project=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=event.request_id,
+ )
+
+
+@dataclasses.dataclass(frozen=True)
+class BigQueryRetryEvent(ExecutionRunning):
+ """Query sent another time because the previous attempt failed."""
+
+ query: str
+ billing_project: Optional[str] = None
+ location: Optional[str] = None
+ job_id: Optional[str] = None
+ request_id: Optional[str] = None
+
+ @classmethod
+ def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
+ return cls(
+ query=event.query,
+ billing_project=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=event.request_id,
+ )
+
+
+@dataclasses.dataclass(frozen=True)
+class BigQueryReceivedEvent(ExecutionRunning):
+ """Query received and acknowledged by the BigQuery API."""
+
+ billing_project: Optional[str] = None
+ location: Optional[str] = None
+ job_id: Optional[str] = None
+ statement_type: Optional[str] = None
+ state: Optional[str] = None
+ query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
+ created: Optional[datetime.datetime] = None
+ started: Optional[datetime.datetime] = None
+ ended: Optional[datetime.datetime] = None
+
+ @classmethod
+ def from_bqclient(
+ cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
+ ):
+ return cls(
+ billing_project=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ statement_type=event.statement_type,
+ state=event.state,
+ query_plan=event.query_plan,
+ created=event.created,
+ started=event.started,
+ ended=event.ended,
+ )
+
+
+@dataclasses.dataclass(frozen=True)
+class BigQueryFinishedEvent(ExecutionRunning):
+ """Query finished successfully."""
+
+ billing_project: Optional[str] = None
+ location: Optional[str] = None
+ query_id: Optional[str] = None
+ job_id: Optional[str] = None
+ destination: Optional[google.cloud.bigquery.table.TableReference] = None
+ total_rows: Optional[int] = None
+ total_bytes_processed: Optional[int] = None
+ slot_millis: Optional[int] = None
+ created: Optional[datetime.datetime] = None
+ started: Optional[datetime.datetime] = None
+ ended: Optional[datetime.datetime] = None
+
+ @classmethod
+ def from_bqclient(
+ cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
+ ):
+ return cls(
+ billing_project=event.billing_project,
+ location=event.location,
+ query_id=event.query_id,
+ job_id=event.job_id,
+ destination=event.destination,
+ total_rows=event.total_rows,
+ total_bytes_processed=event.total_bytes_processed,
+ slot_millis=event.slot_millis,
+ created=event.created,
+ started=event.started,
+ ended=event.ended,
+ )
+
+
+@dataclasses.dataclass(frozen=True)
+class BigQueryUnknownEvent(ExecutionRunning):
+ """Got unknown event from the BigQuery client library."""
+
+ # TODO: should we just skip sending unknown events?
+
+ event: object
+
+ @classmethod
+ def from_bqclient(cls, event):
+ return cls(event)
diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py
index de153fca48..ea04d48d33 100644
--- a/bigframes/dataframe.py
+++ b/bigframes/dataframe.py
@@ -4671,24 +4671,24 @@ def to_string(
) -> str | None:
return self.to_pandas(allow_large_results=allow_large_results).to_string(
buf,
- columns, # type: ignore
- col_space,
- header, # type: ignore
- index,
- na_rep,
- formatters,
- float_format,
- sparsify,
- index_names,
- justify,
- max_rows,
- max_cols,
- show_dimensions,
- decimal,
- line_width,
- min_rows,
- max_colwidth,
- encoding,
+ columns=columns, # type: ignore
+ col_space=col_space,
+ header=header, # type: ignore
+ index=index,
+ na_rep=na_rep,
+ formatters=formatters,
+ float_format=float_format,
+ sparsify=sparsify,
+ index_names=index_names,
+ justify=justify,
+ max_rows=max_rows,
+ max_cols=max_cols,
+ show_dimensions=show_dimensions,
+ decimal=decimal,
+ line_width=line_width,
+ min_rows=min_rows,
+ max_colwidth=max_colwidth,
+ encoding=encoding,
)
def to_html(
@@ -4721,28 +4721,28 @@ def to_html(
) -> str:
return self.to_pandas(allow_large_results=allow_large_results).to_html(
buf,
- columns, # type: ignore
- col_space,
- header,
- index,
- na_rep,
- formatters,
- float_format,
- sparsify,
- index_names,
- justify, # type: ignore
- max_rows,
- max_cols,
- show_dimensions,
- decimal,
- bold_rows,
- classes,
- escape,
- notebook,
- border,
- table_id,
- render_links,
- encoding,
+ columns=columns, # type: ignore
+ col_space=col_space,
+ header=header,
+ index=index,
+ na_rep=na_rep,
+ formatters=formatters,
+ float_format=float_format,
+ sparsify=sparsify,
+ index_names=index_names,
+ justify=justify, # type: ignore
+ max_rows=max_rows,
+ max_cols=max_cols,
+ show_dimensions=show_dimensions,
+ decimal=decimal,
+ bold_rows=bold_rows,
+ classes=classes,
+ escape=escape,
+ notebook=notebook,
+ border=border,
+ table_id=table_id,
+ render_links=render_links,
+ encoding=encoding,
)
def to_markdown(
@@ -4754,7 +4754,7 @@ def to_markdown(
allow_large_results: Optional[bool] = None,
**kwargs,
) -> str | None:
- return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode, index, **kwargs) # type: ignore
+ return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore
def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None:
return self.to_pandas(allow_large_results=allow_large_results).to_pickle(
diff --git a/bigframes/formatting_helpers.py b/bigframes/formatting_helpers.py
index 48afb4fdbd..f75394c47d 100644
--- a/bigframes/formatting_helpers.py
+++ b/bigframes/formatting_helpers.py
@@ -13,11 +13,13 @@
# limitations under the License.
"""Shared helper functions for formatting jobs related info."""
-# TODO(orrbradford): cleanup up typings and documenttion in this file
+
+from __future__ import annotations
import datetime
+import html
import random
-from typing import Any, Optional, Type, Union
+from typing import Any, Optional, Type, TYPE_CHECKING, Union
import bigframes_vendored.constants as constants
import google.api_core.exceptions as api_core_exceptions
@@ -25,7 +27,9 @@
import humanize
import IPython
import IPython.display as display
-import ipywidgets as widgets
+
+if TYPE_CHECKING:
+ import bigframes.core.events
GenericJob = Union[
bigquery.LoadJob, bigquery.ExtractJob, bigquery.QueryJob, bigquery.CopyJob
@@ -58,39 +62,6 @@ def create_exception_with_feedback_link(
return exception(constants.FEEDBACK_LINK)
-def repr_query_job_html(query_job: Optional[bigquery.QueryJob]):
- """Return query job in html format.
- Args:
- query_job (bigquery.QueryJob, Optional):
- The job representing the execution of the query on the server.
- Returns:
- Pywidget html table.
- """
- if query_job is None:
- return display.HTML("No job information available")
- if query_job.dry_run:
- return display.HTML(
- f"Computation deferred. Computation will process {get_formatted_bytes(query_job.total_bytes_processed)}"
- )
- table_html = ""
- table_html += "
"
- for key, value in query_job_prop_pairs.items():
- job_val = getattr(query_job, value)
- if job_val is not None:
- if key == "Job Id": # add link to job
- table_html += f"""| {key} | {job_val} |
"""
- elif key == "Slot Time":
- table_html += (
- f"""| {key} | {get_formatted_time(job_val)} |
"""
- )
- elif key == "Bytes Processed":
- table_html += f"""| {key} | {get_formatted_bytes(job_val)} |
"""
- else:
- table_html += f"""| {key} | {job_val} |
"""
- table_html += "
"
- return widgets.HTML(table_html)
-
-
def repr_query_job(query_job: Optional[bigquery.QueryJob]):
"""Return query job as a formatted string.
Args:
@@ -109,7 +80,11 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
if job_val is not None:
res += "\n"
if key == "Job Id": # add link to job
- res += f"""Job url: {get_job_url(query_job)}"""
+ res += f"""Job url: {get_job_url(
+ project_id=query_job.project,
+ location=query_job.location,
+ job_id=query_job.job_id,
+ )}"""
elif key == "Slot Time":
res += f"""{key}: {get_formatted_time(job_val)}"""
elif key == "Bytes Processed":
@@ -119,71 +94,90 @@ def repr_query_job(query_job: Optional[bigquery.QueryJob]):
return res
-def wait_for_query_job(
- query_job: bigquery.QueryJob,
- max_results: Optional[int] = None,
- page_size: Optional[int] = None,
- progress_bar: Optional[str] = None,
-) -> bigquery.table.RowIterator:
- """Return query results. Displays a progress bar while the query is running
- Args:
- query_job (bigquery.QueryJob, Optional):
- The job representing the execution of the query on the server.
- max_results (int, Optional):
- The maximum number of rows the row iterator should return.
- page_size (int, Optional):
- The number of results to return on each results page.
- progress_bar (str, Optional):
- Which progress bar to show.
- Returns:
- A row iterator over the query results.
- """
+current_display: Optional[display.HTML] = None
+current_display_id: Optional[str] = None
+previous_display_html: str = ""
+
+
+def progress_callback(
+ event: bigframes.core.events.Event,
+):
+ """Displays a progress bar while the query is running"""
+ global current_display, current_display_id, previous_display_html
+
+ import bigframes._config
+ import bigframes.core.events
+
+ progress_bar = bigframes._config.options.display.progress_bar
+
if progress_bar == "auto":
progress_bar = "notebook" if in_ipython() else "terminal"
- try:
- if progress_bar == "notebook":
- display_id = str(random.random())
- loading_bar = display.HTML(get_query_job_loading_html(query_job))
- display.display(loading_bar, display_id=display_id)
- query_result = query_job.result(
- max_results=max_results, page_size=page_size
+ if progress_bar == "notebook":
+ if (
+ isinstance(event, bigframes.core.events.ExecutionStarted)
+ or current_display is None
+ or current_display_id is None
+ ):
+ previous_display_html = ""
+ current_display_id = str(random.random())
+ current_display = display.HTML("Starting.")
+ display.display(
+ current_display,
+ display_id=current_display_id,
)
- query_job.reload()
+
+ if isinstance(event, bigframes.core.events.BigQuerySentEvent):
+ previous_display_html = render_bqquery_sent_event_html(event)
display.update_display(
- display.HTML(get_query_job_loading_html(query_job)),
- display_id=display_id,
+ display.HTML(previous_display_html),
+ display_id=current_display_id,
)
- elif progress_bar == "terminal":
- initial_loading_bar = get_query_job_loading_string(query_job)
- print(initial_loading_bar)
- query_result = query_job.result(
- max_results=max_results, page_size=page_size
+ elif isinstance(event, bigframes.core.events.BigQueryRetryEvent):
+ previous_display_html = render_bqquery_retry_event_html(event)
+ display.update_display(
+ display.HTML(previous_display_html),
+ display_id=current_display_id,
)
- query_job.reload()
- if initial_loading_bar != get_query_job_loading_string(query_job):
- print(get_query_job_loading_string(query_job))
- else:
- # No progress bar.
- query_result = query_job.result(
- max_results=max_results, page_size=page_size
+ elif isinstance(event, bigframes.core.events.BigQueryReceivedEvent):
+ previous_display_html = render_bqquery_received_event_html(event)
+ display.update_display(
+ display.HTML(previous_display_html),
+ display_id=current_display_id,
)
- query_job.reload()
- return query_result
- except api_core_exceptions.RetryError as exc:
- add_feedback_link(exc)
- raise
- except api_core_exceptions.GoogleAPICallError as exc:
- add_feedback_link(exc)
- raise
- except KeyboardInterrupt:
- query_job.cancel()
- print(
- f"Requested cancellation for {query_job.job_type.capitalize()}"
- f" job {query_job.job_id} in location {query_job.location}..."
- )
- # begin the cancel request before immediately rethrowing
- raise
+ elif isinstance(event, bigframes.core.events.BigQueryFinishedEvent):
+ previous_display_html = render_bqquery_finished_event_html(event)
+ display.update_display(
+ display.HTML(previous_display_html),
+ display_id=current_display_id,
+ )
+ elif isinstance(event, bigframes.core.events.ExecutionFinished):
+ display.update_display(
+ display.HTML(f"✅ Completed. {previous_display_html}"),
+ display_id=current_display_id,
+ )
+ elif isinstance(event, bigframes.core.events.SessionClosed):
+ display.update_display(
+ display.HTML(f"Session {event.session_id} closed."),
+ display_id=current_display_id,
+ )
+ elif progress_bar == "terminal":
+ if isinstance(event, bigframes.core.events.ExecutionStarted):
+ print("Starting execution.")
+ elif isinstance(event, bigframes.core.events.BigQuerySentEvent):
+ message = render_bqquery_sent_event_plaintext(event)
+ print(message)
+ elif isinstance(event, bigframes.core.events.BigQueryRetryEvent):
+ message = render_bqquery_retry_event_plaintext(event)
+ print(message)
+ elif isinstance(event, bigframes.core.events.BigQueryReceivedEvent):
+ message = render_bqquery_received_event_plaintext(event)
+ print(message)
+ elif isinstance(event, bigframes.core.events.BigQueryFinishedEvent):
+ message = render_bqquery_finished_event_plaintext(event)
+ print(message)
+ elif isinstance(event, bigframes.core.events.ExecutionFinished):
+ print("Execution done.")
def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None):
@@ -234,24 +228,74 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None):
raise
-def get_job_url(query_job: GenericJob):
+def render_query_references(
+ *,
+ project_id: Optional[str],
+ location: Optional[str],
+ job_id: Optional[str],
+ request_id: Optional[str],
+) -> str:
+ query_id = ""
+ if request_id and not job_id:
+ query_id = f" with request ID {project_id}:{location}.{request_id}"
+ return query_id
+
+
+def render_job_link_html(
+ *,
+ project_id: Optional[str],
+ location: Optional[str],
+ job_id: Optional[str],
+) -> str:
+ job_url = get_job_url(
+ project_id=project_id,
+ location=location,
+ job_id=job_id,
+ )
+ if job_url:
+ job_link = f' [Job {project_id}:{location}.{job_id} details]'
+ else:
+ job_link = ""
+ return job_link
+
+
+def render_job_link_plaintext(
+ *,
+ project_id: Optional[str],
+ location: Optional[str],
+ job_id: Optional[str],
+) -> str:
+ job_url = get_job_url(
+ project_id=project_id,
+ location=location,
+ job_id=job_id,
+ )
+ if job_url:
+ job_link = f" Job {project_id}:{location}.{job_id} details: {job_url}"
+ else:
+ job_link = ""
+ return job_link
+
+
+def get_job_url(
+ *,
+ project_id: Optional[str],
+ location: Optional[str],
+ job_id: Optional[str],
+):
"""Return url to the query job in cloud console.
- Args:
- query_job (GenericJob):
- The job representing the execution of the query on the server.
+
Returns:
String url.
"""
- if (
- query_job.project is None
- or query_job.location is None
- or query_job.job_id is None
- ):
+ if project_id is None or location is None or job_id is None:
return None
- return f"""https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{query_job.job_id}&page=queryresults"""
+ return f"""https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"""
-def get_query_job_loading_html(query_job: bigquery.QueryJob):
+def render_bqquery_sent_event_html(
+ event: bigframes.core.events.BigQuerySentEvent,
+) -> str:
"""Return progress bar html string
Args:
query_job (bigquery.QueryJob):
@@ -259,18 +303,195 @@ def get_query_job_loading_html(query_job: bigquery.QueryJob):
Returns:
Html string.
"""
- return f"""Query job {query_job.job_id} is {query_job.state}. {get_bytes_processed_string(query_job.total_bytes_processed)}Open Job"""
+ job_link = render_job_link_html(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=event.request_id,
+ )
+ query_text_details = f"SQL
{html.escape(event.query)} "
+
+ return f"""
+ Query started{query_id}.{job_link}{query_text_details}
+ """
-def get_query_job_loading_string(query_job: bigquery.QueryJob):
- """Return progress bar string
+
+def render_bqquery_sent_event_plaintext(
+ event: bigframes.core.events.BigQuerySentEvent,
+) -> str:
+ """Return progress bar html string
Args:
query_job (bigquery.QueryJob):
The job representing the execution of the query on the server.
Returns:
- String
+ Html string.
+ """
+
+ job_link = render_job_link_plaintext(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=event.request_id,
+ )
+
+ return f"Query started{query_id}.{job_link}"
+
+
+def render_bqquery_retry_event_html(
+ event: bigframes.core.events.BigQueryRetryEvent,
+) -> str:
+ """Return progress bar html string for retry event."""
+
+ job_link = render_job_link_html(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=event.request_id,
+ )
+ query_text_details = f"SQL
{html.escape(event.query)} "
+
+ return f"""
+ Retrying query{query_id}.{job_link}{query_text_details}
+ """
+
+
+def render_bqquery_retry_event_plaintext(
+ event: bigframes.core.events.BigQueryRetryEvent,
+) -> str:
+ """Return progress bar plaintext string for retry event."""
+
+ job_link = render_job_link_plaintext(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=event.request_id,
+ )
+ return f"Retrying query{query_id}.{job_link}"
+
+
+def render_bqquery_received_event_html(
+ event: bigframes.core.events.BigQueryReceivedEvent,
+) -> str:
+ """Return progress bar html string for received event."""
+
+ job_link = render_job_link_html(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=None,
+ )
+
+ query_plan_details = ""
+ if event.query_plan:
+ plan_str = "\n".join([str(entry) for entry in event.query_plan])
+ query_plan_details = f"Query Plan
{html.escape(plan_str)} "
+
+ return f"""
+ Query{query_id} is {event.state}.{job_link}{query_plan_details}
+ """
+
+
+def render_bqquery_received_event_plaintext(
+ event: bigframes.core.events.BigQueryReceivedEvent,
+) -> str:
+ """Return progress bar plaintext string for received event."""
+
+ job_link = render_job_link_plaintext(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=None,
+ )
+ return f"Query{query_id} is {event.state}.{job_link}"
+
+
+def render_bqquery_finished_event_html(
+ event: bigframes.core.events.BigQueryFinishedEvent,
+) -> str:
+ """Return progress bar html string for finished event."""
+
+ bytes_str = ""
+ if event.total_bytes_processed is not None:
+ bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)}"
+
+ slot_time_str = ""
+ if event.slot_millis is not None:
+ slot_time = datetime.timedelta(milliseconds=event.slot_millis)
+ slot_time_str = f" in {humanize.naturaldelta(slot_time)} of slot time"
+
+ job_link = render_job_link_html(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=None,
+ )
+ return f"""
+ Query processed{bytes_str}{slot_time_str}{query_id}.{job_link}
"""
- return f"""Query job {query_job.job_id} is {query_job.state}.{get_bytes_processed_string(query_job.total_bytes_processed)} \n{get_job_url(query_job)}"""
+
+
+def render_bqquery_finished_event_plaintext(
+ event: bigframes.core.events.BigQueryFinishedEvent,
+) -> str:
+ """Return progress bar plaintext string for finished event."""
+
+ bytes_str = ""
+ if event.total_bytes_processed is not None:
+ bytes_str = f" {humanize.naturalsize(event.total_bytes_processed)} processed."
+
+ slot_time_str = ""
+ if event.slot_millis is not None:
+ slot_time = datetime.timedelta(milliseconds=event.slot_millis)
+ slot_time_str = f" Slot time: {humanize.naturaldelta(slot_time)}."
+
+ job_link = render_job_link_plaintext(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ )
+ query_id = render_query_references(
+ project_id=event.billing_project,
+ location=event.location,
+ job_id=event.job_id,
+ request_id=None,
+ )
+ return f"Query{query_id} finished.{bytes_str}{slot_time_str}{job_link}"
def get_base_job_loading_html(job: GenericJob):
@@ -281,7 +502,11 @@ def get_base_job_loading_html(job: GenericJob):
Returns:
Html string.
"""
- return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. Open Job"""
+ return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. Open Job"""
def get_base_job_loading_string(job: GenericJob):
@@ -292,7 +517,11 @@ def get_base_job_loading_string(job: GenericJob):
Returns:
String
"""
- return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. \n{get_job_url(job)}"""
+ return f"""{job.job_type.capitalize()} job {job.job_id} is {job.state}. \n{get_job_url(
+ project_id=job.job_id,
+ location=job.location,
+ job_id=job.job_id,
+ )}"""
def get_formatted_time(val):
diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py
index 641bf52dc9..8a88a14040 100644
--- a/bigframes/functions/_function_client.py
+++ b/bigframes/functions/_function_client.py
@@ -145,6 +145,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
timeout=None,
metrics=None,
query_with_job=True,
+ publisher=self._session._publisher,
)
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py
index 99b89131e7..242daf7525 100644
--- a/bigframes/functions/function.py
+++ b/bigframes/functions/function.py
@@ -219,7 +219,13 @@ def __call__(self, *args, **kwargs):
args_string = ", ".join(map(bf_sql.simple_literal, args))
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
- iter, job = bf_io_bigquery.start_query_with_client(self._session.bqclient, sql=sql, query_with_job=True, job_config=bigquery.QueryJobConfig()) # type: ignore
+ iter, job = bf_io_bigquery.start_query_with_client(
+ self._session.bqclient,
+ sql=sql,
+ query_with_job=True,
+ job_config=bigquery.QueryJobConfig(),
+ publisher=self._session._publisher,
+ ) # type: ignore
return list(iter.to_arrow().to_pydict().values())[0][0]
@property
@@ -297,7 +303,13 @@ def __call__(self, *args, **kwargs):
args_string = ", ".join(map(bf_sql.simple_literal, args))
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
- iter, job = bf_io_bigquery.start_query_with_client(self._session.bqclient, sql=sql, query_with_job=True, job_config=bigquery.QueryJobConfig()) # type: ignore
+ iter, job = bf_io_bigquery.start_query_with_client(
+ self._session.bqclient,
+ sql=sql,
+ query_with_job=True,
+ job_config=bigquery.QueryJobConfig(),
+ publisher=self._session._publisher,
+ ) # type: ignore
return list(iter.to_arrow().to_pydict().values())[0][0]
@property
diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py
index 2ea10132bc..2455637b0a 100644
--- a/bigframes/pandas/__init__.py
+++ b/bigframes/pandas/__init__.py
@@ -291,6 +291,7 @@ def clean_up_by_session_id(
session.bqclient,
location=location,
project=project,
+ publisher=session._publisher,
)
bigframes.session._io.bigquery.delete_tables_matching_session_id(
diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py
index f0cec864b4..6c90838b17 100644
--- a/bigframes/session/__init__.py
+++ b/bigframes/session/__init__.py
@@ -67,18 +67,14 @@
import bigframes.constants
import bigframes.core
from bigframes.core import blocks, log_adapter, utils
+import bigframes.core.events
import bigframes.core.pyformat
-
-# Even though the ibis.backends.bigquery import is unused, it's needed
-# to register new and replacement ops with the Ibis BigQuery backend.
+import bigframes.formatting_helpers
import bigframes.functions._function_session as bff_session
import bigframes.functions.function as bff
from bigframes.session import bigquery_session, bq_caching_executor, executor
import bigframes.session._io.bigquery as bf_io_bigquery
-import bigframes.session.anonymous_dataset
import bigframes.session.clients
-import bigframes.session.loader
-import bigframes.session.metrics
import bigframes.session.validation
# Avoid circular imports.
@@ -140,6 +136,11 @@ def __init__(
_warn_if_bf_version_is_obsolete()
+ # Publisher needs to be created before the other objects, especially
+ # the executors, because they access it.
+ self._publisher = bigframes.core.events.Publisher()
+ self._publisher.subscribe(bigframes.formatting_helpers.progress_callback)
+
if context is None:
context = bigquery_options.BigQueryOptions()
@@ -232,12 +233,14 @@ def __init__(
location=self._location,
session_id=self._session_id,
kms_key=self._bq_kms_key_name,
+ publisher=self._publisher,
)
# Session temp tables don't support specifying kms key, so use anon dataset if kms key specified
self._session_resource_manager = (
bigquery_session.SessionResourceManager(
self.bqclient,
self._location,
+ publisher=self._publisher,
)
if (self._bq_kms_key_name is None)
else None
@@ -254,6 +257,7 @@ def __init__(
scan_index_uniqueness=self._strictly_ordered,
force_total_order=self._strictly_ordered,
metrics=self._metrics,
+ publisher=self._publisher,
)
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
bqclient=self._clients_provider.bqclient,
@@ -263,6 +267,7 @@ def __init__(
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
enable_polars_execution=context.enable_polars_execution,
+ publisher=self._publisher,
)
def __del__(self):
@@ -373,10 +378,16 @@ def close(self):
remote_function_session = getattr(self, "_function_session", None)
if remote_function_session:
- self._function_session.clean_up(
+ remote_function_session.clean_up(
self.bqclient, self.cloudfunctionsclient, self.session_id
)
+ publisher_session = getattr(self, "_publisher", None)
+ if publisher_session:
+ publisher_session.publish(
+ bigframes.core.events.SessionClosed(self.session_id)
+ )
+
@overload
def read_gbq( # type: ignore[overload-overlap]
self,
@@ -2153,6 +2164,7 @@ def _start_query_ml_ddl(
timeout=None,
query_with_job=True,
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
+ publisher=self._publisher,
)
return iterator, query_job
@@ -2180,6 +2192,7 @@ def _create_object_table(self, path: str, connection: str) -> str:
project=None,
timeout=None,
query_with_job=True,
+ publisher=self._publisher,
)
return table
diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py
index 83f63e8b9a..aa56dc0040 100644
--- a/bigframes/session/_io/bigquery/__init__.py
+++ b/bigframes/session/_io/bigquery/__init__.py
@@ -29,11 +29,13 @@
import google.api_core.exceptions
import google.api_core.retry
import google.cloud.bigquery as bigquery
+import google.cloud.bigquery._job_helpers
+import google.cloud.bigquery.table
from bigframes.core import log_adapter
import bigframes.core.compile.googlesql as googlesql
+import bigframes.core.events
import bigframes.core.sql
-import bigframes.formatting_helpers as formatting_helpers
import bigframes.session.metrics
CHECK_DRIVE_PERMISSIONS = "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
@@ -238,6 +240,24 @@ def add_and_trim_labels(job_config):
)
+def create_bq_event_callback(publisher):
+ def publish_bq_event(event):
+ if isinstance(event, google.cloud.bigquery._job_helpers.QueryFinishedEvent):
+ bf_event = bigframes.core.events.BigQueryFinishedEvent.from_bqclient(event)
+ elif isinstance(event, google.cloud.bigquery._job_helpers.QueryReceivedEvent):
+ bf_event = bigframes.core.events.BigQueryReceivedEvent.from_bqclient(event)
+ elif isinstance(event, google.cloud.bigquery._job_helpers.QueryRetryEvent):
+ bf_event = bigframes.core.events.BigQueryRetryEvent.from_bqclient(event)
+ elif isinstance(event, google.cloud.bigquery._job_helpers.QuerySentEvent):
+ bf_event = bigframes.core.events.BigQuerySentEvent.from_bqclient(event)
+ else:
+ bf_event = bigframes.core.events.BigQueryUnknownEvent(event)
+
+ publisher.publish(bf_event)
+
+ return publish_bq_event
+
+
@overload
def start_query_with_client(
bq_client: bigquery.Client,
@@ -249,7 +269,8 @@ def start_query_with_client(
timeout: Optional[float],
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
query_with_job: Literal[True],
-) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
+ publisher: bigframes.core.events.Publisher,
+) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
...
@@ -264,7 +285,8 @@ def start_query_with_client(
timeout: Optional[float],
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
query_with_job: Literal[False],
-) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
+ publisher: bigframes.core.events.Publisher,
+) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
...
@@ -280,7 +302,8 @@ def start_query_with_client(
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
query_with_job: Literal[True],
job_retry: google.api_core.retry.Retry,
-) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
+ publisher: bigframes.core.events.Publisher,
+) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]:
...
@@ -296,7 +319,8 @@ def start_query_with_client(
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
query_with_job: Literal[False],
job_retry: google.api_core.retry.Retry,
-) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
+ publisher: bigframes.core.events.Publisher,
+) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
...
@@ -315,23 +339,26 @@ def start_query_with_client(
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
# version 3.36.0 or later.
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
-) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
+ publisher: bigframes.core.events.Publisher,
+) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts query job and waits for results.
"""
+ # Note: Ensure no additional labels are added to job_config after this
+ # point, as `add_and_trim_labels` ensures the label count does not
+ # exceed MAX_LABELS_COUNT.
+ add_and_trim_labels(job_config)
+
try:
- # Note: Ensure no additional labels are added to job_config after this
- # point, as `add_and_trim_labels` ensures the label count does not
- # exceed MAX_LABELS_COUNT.
- add_and_trim_labels(job_config)
if not query_with_job:
- results_iterator = bq_client.query_and_wait(
+ results_iterator = bq_client._query_and_wait_bigframes(
sql,
job_config=job_config,
location=location,
project=project,
api_timeout=timeout,
job_retry=job_retry,
+ callback=create_bq_event_callback(publisher),
)
if metrics is not None:
metrics.count_job_stats(row_iterator=results_iterator)
@@ -350,14 +377,32 @@ def start_query_with_client(
ex.message += CHECK_DRIVE_PERMISSIONS
raise
- opts = bigframes.options.display
- if opts.progress_bar is not None and not query_job.configuration.dry_run:
- results_iterator = formatting_helpers.wait_for_query_job(
- query_job,
- progress_bar=opts.progress_bar,
+ if not query_job.configuration.dry_run:
+ publisher.publish(
+ bigframes.core.events.BigQuerySentEvent(
+ sql,
+ billing_project=query_job.project,
+ location=query_job.location,
+ job_id=query_job.job_id,
+ request_id=None,
+ )
+ )
+ results_iterator = query_job.result()
+ if not query_job.configuration.dry_run:
+ publisher.publish(
+ bigframes.core.events.BigQueryFinishedEvent(
+ billing_project=query_job.project,
+ location=query_job.location,
+ job_id=query_job.job_id,
+ destination=query_job.destination,
+ total_rows=results_iterator.total_rows,
+ total_bytes_processed=query_job.total_bytes_processed,
+ slot_millis=query_job.slot_millis,
+ created=query_job.created,
+ started=query_job.started,
+ ended=query_job.ended,
+ )
)
- else:
- results_iterator = query_job.result()
if metrics is not None:
metrics.count_job_stats(query_job=query_job)
@@ -399,6 +444,8 @@ def create_bq_dataset_reference(
bq_client: bigquery.Client,
location: Optional[str] = None,
project: Optional[str] = None,
+ *,
+ publisher: bigframes.core.events.Publisher,
) -> bigquery.DatasetReference:
"""Create and identify dataset(s) for temporary BQ resources.
@@ -430,6 +477,7 @@ def create_bq_dataset_reference(
timeout=None,
metrics=None,
query_with_job=True,
+ publisher=publisher,
)
# The anonymous dataset is used by BigQuery to write query results and
diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py
index 00531ce25d..f8a379aee9 100644
--- a/bigframes/session/_io/bigquery/read_gbq_table.py
+++ b/bigframes/session/_io/bigquery/read_gbq_table.py
@@ -26,8 +26,9 @@
import bigframes_vendored.constants as constants
import google.api_core.exceptions
import google.cloud.bigquery as bigquery
+import google.cloud.bigquery.table
-import bigframes.core.sql
+import bigframes.core.events
import bigframes.exceptions as bfe
import bigframes.session._io.bigquery
@@ -43,6 +44,7 @@ def get_table_metadata(
*,
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
use_cache: bool = True,
+ publisher: bigframes.core.events.Publisher,
) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]:
"""Get the table metadata, either from cache or via REST API."""
@@ -59,6 +61,7 @@ def get_table_metadata(
# Don't warn, because that will already have been taken care of.
should_warn=False,
should_dry_run=False,
+ publisher=publisher,
):
# This warning should only happen if the cached snapshot_time will
# have any effect on bigframes (b/437090788). For example, with
@@ -101,13 +104,14 @@ def get_table_metadata(
def is_time_travel_eligible(
bqclient: bigquery.Client,
- table: bigquery.table.Table,
+ table: google.cloud.bigquery.table.Table,
columns: Optional[Sequence[str]],
snapshot_time: datetime.datetime,
filter_str: Optional[str] = None,
*,
should_warn: bool,
should_dry_run: bool,
+ publisher: bigframes.core.events.Publisher,
):
"""Check if a table is eligible to use time-travel.
@@ -184,6 +188,7 @@ def is_time_travel_eligible(
timeout=None,
metrics=None,
query_with_job=False,
+ publisher=publisher,
)
return True
@@ -210,10 +215,8 @@ def is_time_travel_eligible(
def infer_unique_columns(
- bqclient: bigquery.Client,
- table: bigquery.table.Table,
+ table: google.cloud.bigquery.table.Table,
index_cols: List[str],
- metadata_only: bool = False,
) -> Tuple[str, ...]:
"""Return a set of columns that can provide a unique row key or empty if none can be inferred.
@@ -227,14 +230,37 @@ def infer_unique_columns(
# Essentially, just reordering the primary key to match the index col order
return tuple(index_col for index_col in index_cols if index_col in primary_keys)
- if primary_keys or metadata_only or (not index_cols):
- # Sometimes not worth scanning data to check uniqueness
+ if primary_keys:
return primary_keys
+
+ return ()
+
+
+def check_if_index_columns_are_unique(
+ bqclient: bigquery.Client,
+ table: google.cloud.bigquery.table.Table,
+ index_cols: List[str],
+ *,
+ publisher: bigframes.core.events.Publisher,
+) -> Tuple[str, ...]:
+ import bigframes.core.sql
+ import bigframes.session._io.bigquery
+
# TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring
# table_expression only selects just index_cols.
is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference)
job_config = bigquery.QueryJobConfig()
- results = bqclient.query_and_wait(is_unique_sql, job_config=job_config)
+ results, _ = bigframes.session._io.bigquery.start_query_with_client(
+ bq_client=bqclient,
+ sql=is_unique_sql,
+ job_config=job_config,
+ timeout=None,
+ location=None,
+ project=None,
+ metrics=None,
+ query_with_job=False,
+ publisher=publisher,
+ )
row = next(iter(results))
if row["total_count"] == row["distinct_count"]:
@@ -243,7 +269,7 @@ def infer_unique_columns(
def _get_primary_keys(
- table: bigquery.table.Table,
+ table: google.cloud.bigquery.table.Table,
) -> List[str]:
"""Get primary keys from table if they are set."""
@@ -261,7 +287,7 @@ def _get_primary_keys(
def _is_table_clustered_or_partitioned(
- table: bigquery.table.Table,
+ table: google.cloud.bigquery.table.Table,
) -> bool:
"""Returns True if the table is clustered or partitioned."""
@@ -284,7 +310,7 @@ def _is_table_clustered_or_partitioned(
def get_index_cols(
- table: bigquery.table.Table,
+ table: google.cloud.bigquery.table.Table,
index_col: Iterable[str]
| str
| Iterable[int]
diff --git a/bigframes/session/anonymous_dataset.py b/bigframes/session/anonymous_dataset.py
index ec624d4eb4..3c1757806b 100644
--- a/bigframes/session/anonymous_dataset.py
+++ b/bigframes/session/anonymous_dataset.py
@@ -20,6 +20,7 @@
import google.cloud.bigquery as bigquery
from bigframes import constants
+import bigframes.core.events
from bigframes.session import temporary_storage
import bigframes.session._io.bigquery as bf_io_bigquery
@@ -37,10 +38,12 @@ def __init__(
location: str,
session_id: str,
*,
- kms_key: Optional[str] = None
+ kms_key: Optional[str] = None,
+ publisher: bigframes.core.events.Publisher,
):
self.bqclient = bqclient
self._location = location
+ self._publisher = publisher
self.session_id = session_id
self._table_ids: List[bigquery.TableReference] = []
@@ -62,6 +65,7 @@ def dataset(self) -> bigquery.DatasetReference:
self._datset_ref = bf_io_bigquery.create_bq_dataset_reference(
self.bqclient,
location=self._location,
+ publisher=self._publisher,
)
return self._datset_ref
diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py
index 883087df07..99c13007d8 100644
--- a/bigframes/session/bigquery_session.py
+++ b/bigframes/session/bigquery_session.py
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import annotations
+
import datetime
import logging
import threading
@@ -23,7 +25,9 @@
import google.cloud.bigquery as bigquery
from bigframes.core.compile import googlesql
+import bigframes.core.events
from bigframes.session import temporary_storage
+import bigframes.session._io.bigquery as bfbqio
KEEPALIVE_QUERY_TIMEOUT_SECONDS = 5.0
@@ -38,12 +42,19 @@ class SessionResourceManager(temporary_storage.TemporaryStorageManager):
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
"""
- def __init__(self, bqclient: bigquery.Client, location: str):
+ def __init__(
+ self,
+ bqclient: bigquery.Client,
+ location: str,
+ *,
+ publisher: bigframes.core.events.Publisher,
+ ):
self.bqclient = bqclient
self._location = location
self._session_id: Optional[str] = None
self._sessiondaemon: Optional[RecurringTaskDaemon] = None
self._session_lock = threading.RLock()
+ self._publisher = publisher
@property
def location(self):
@@ -84,21 +95,38 @@ def create_temp_table(
ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}"
- job = self.bqclient.query(
- ddl, job_config=job_config, location=self.location
+ _, job = bfbqio.start_query_with_client(
+ self.bqclient,
+ ddl,
+ job_config=job_config,
+ location=self.location,
+ project=None,
+ timeout=None,
+ metrics=None,
+ query_with_job=True,
+ publisher=self._publisher,
)
job.result()
# return the fully qualified table, so it can be used outside of the session
- return job.destination
+ destination = job.destination
+ assert destination is not None, "Failure to create temp table."
+ return destination
def close(self):
if self._sessiondaemon is not None:
self._sessiondaemon.stop()
if self._session_id is not None and self.bqclient is not None:
- self.bqclient.query_and_wait(
+ bfbqio.start_query_with_client(
+ self.bqclient,
f"CALL BQ.ABORT_SESSION('{self._session_id}')",
+ job_config=bigquery.QueryJobConfig(),
location=self.location,
+ project=None,
+ timeout=None,
+ metrics=None,
+ query_with_job=False,
+ publisher=self._publisher,
)
def _get_session_id(self) -> str:
@@ -109,8 +137,16 @@ def _get_session_id(self) -> str:
job_config = bigquery.QueryJobConfig(create_session=True)
# Make sure the session is a new one, not one associated with another query.
job_config.use_query_cache = False
- query_job = self.bqclient.query(
- "SELECT 1", job_config=job_config, location=self.location
+ _, query_job = bfbqio.start_query_with_client(
+ self.bqclient,
+ "SELECT 1",
+ job_config=job_config,
+ location=self.location,
+ project=None,
+ timeout=None,
+ metrics=None,
+ query_with_job=True,
+ publisher=self._publisher,
)
query_job.result() # blocks until finished
assert query_job.session_info is not None
@@ -133,11 +169,16 @@ def _keep_session_alive(self):
]
)
try:
- self.bqclient.query_and_wait(
+ bfbqio.start_query_with_client(
+ self.bqclient,
"SELECT 1",
- location=self.location,
job_config=job_config,
- wait_timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS,
+ location=self.location,
+ project=None,
+ timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS,
+ metrics=None,
+ query_with_job=False,
+ publisher=self._publisher,
)
except Exception as e:
logging.warning("BigQuery session keep-alive query errored : %s", e)
diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py
index cbda9bc640..d4cfa13aa4 100644
--- a/bigframes/session/bq_caching_executor.py
+++ b/bigframes/session/bq_caching_executor.py
@@ -33,6 +33,7 @@
import bigframes.core
from bigframes.core import compile, local_data, rewrite
import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir
+import bigframes.core.events
import bigframes.core.guid
import bigframes.core.identifiers
import bigframes.core.nodes as nodes
@@ -140,6 +141,7 @@ def __init__(
strictly_ordered: bool = True,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
enable_polars_execution: bool = False,
+ publisher: bigframes.core.events.Publisher,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
@@ -149,6 +151,9 @@ def __init__(
self.loader = loader
self.bqstoragereadclient = bqstoragereadclient
self._enable_polars_execution = enable_polars_execution
+ self._publisher = publisher
+
+ # TODO(tswast): Send events from semi-executors, too.
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
read_api_execution.ReadApiSemiExecutor(
bqstoragereadclient=bqstoragereadclient,
@@ -188,6 +193,8 @@ def execute(
array_value: bigframes.core.ArrayValue,
execution_spec: ex_spec.ExecutionSpec,
) -> executor.ExecuteResult:
+ self._publisher.publish(bigframes.core.events.ExecutionStarted())
+
# TODO: Support export jobs in combination with semi executors
if execution_spec.destination_spec is None:
plan = self.prepare_plan(array_value.node, target="simplify")
@@ -196,6 +203,11 @@ def execute(
plan, ordered=execution_spec.ordered, peek=execution_spec.peek
)
if maybe_result:
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(
+ result=maybe_result,
+ )
+ )
return maybe_result
if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec):
@@ -204,7 +216,13 @@ def execute(
"Ordering and peeking not supported for gbq export"
)
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
- return self._export_gbq(array_value, execution_spec.destination_spec)
+ result = self._export_gbq(array_value, execution_spec.destination_spec)
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(
+ result=result,
+ )
+ )
+ return result
result = self._execute_plan_gbq(
array_value.node,
@@ -219,6 +237,11 @@ def execute(
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
self._export_result_gcs(result, execution_spec.destination_spec)
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(
+ result=result,
+ )
+ )
return result
def _export_result_gcs(
@@ -243,6 +266,7 @@ def _export_result_gcs(
location=None,
timeout=None,
query_with_job=True,
+ publisher=self._publisher,
)
def _maybe_find_existing_table(
@@ -404,6 +428,7 @@ def _run_execute_query(
location=None,
timeout=None,
query_with_job=True,
+ publisher=self._publisher,
)
else:
return bq_io.start_query_with_client(
@@ -415,6 +440,7 @@ def _run_execute_query(
location=None,
timeout=None,
query_with_job=False,
+ publisher=self._publisher,
)
except google.api_core.exceptions.BadRequest as e:
diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py
index 7538c9300f..9e7db87301 100644
--- a/bigframes/session/direct_gbq_execution.py
+++ b/bigframes/session/direct_gbq_execution.py
@@ -21,6 +21,7 @@
from bigframes.core import compile, nodes
from bigframes.core.compile import sqlglot
+import bigframes.core.events
from bigframes.session import executor, semi_executor
import bigframes.session._io.bigquery as bq_io
@@ -31,7 +32,11 @@
# reference for validating more complex executors.
class DirectGbqExecutor(semi_executor.SemiExecutor):
def __init__(
- self, bqclient: bigquery.Client, compiler: Literal["ibis", "sqlglot"] = "ibis"
+ self,
+ bqclient: bigquery.Client,
+ compiler: Literal["ibis", "sqlglot"] = "ibis",
+ *,
+ publisher: bigframes.core.events.Publisher,
):
self.bqclient = bqclient
self._compile_fn = (
@@ -39,6 +44,7 @@ def __init__(
if compiler == "ibis"
else sqlglot.SQLGlotCompiler()._compile_sql
)
+ self._publisher = publisher
def execute(
self,
@@ -83,4 +89,5 @@ def _run_execute_query(
timeout=None,
metrics=None,
query_with_job=False,
+ publisher=self._publisher,
)
diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py
index 94d8db6f36..940fdc1352 100644
--- a/bigframes/session/loader.py
+++ b/bigframes/session/loader.py
@@ -50,6 +50,7 @@
from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils
import bigframes.core as core
import bigframes.core.blocks as blocks
+import bigframes.core.events
import bigframes.core.schema as schemata
import bigframes.dtypes
import bigframes.formatting_helpers as formatting_helpers
@@ -262,6 +263,8 @@ def __init__(
scan_index_uniqueness: bool,
force_total_order: bool,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
+ *,
+ publisher: bigframes.core.events.Publisher,
):
self._bqclient = bqclient
self._write_client = write_client
@@ -273,6 +276,7 @@ def __init__(
bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]
] = {}
self._metrics = metrics
+ self._publisher = publisher
# Unfortunate circular reference, but need to pass reference when constructing objects
self._session = session
self._clock = session_time.BigQuerySyncedClock(bqclient)
@@ -499,6 +503,7 @@ def read_gbq_table( # type: ignore[overload-overlap]
force_total_order: Optional[bool] = ...,
n_rows: Optional[int] = None,
index_col_in_columns: bool = False,
+ publish_execution: bool = True,
) -> dataframe.DataFrame:
...
@@ -522,6 +527,7 @@ def read_gbq_table(
force_total_order: Optional[bool] = ...,
n_rows: Optional[int] = None,
index_col_in_columns: bool = False,
+ publish_execution: bool = True,
) -> pandas.Series:
...
@@ -544,6 +550,7 @@ def read_gbq_table(
force_total_order: Optional[bool] = None,
n_rows: Optional[int] = None,
index_col_in_columns: bool = False,
+ publish_execution: bool = True,
) -> dataframe.DataFrame | pandas.Series:
"""Read a BigQuery table into a BigQuery DataFrames DataFrame.
@@ -603,8 +610,12 @@ def read_gbq_table(
when the index is selected from the data columns (e.g., in a
``read_csv`` scenario). The column will be used as the
DataFrame's index and removed from the list of value columns.
+ publish_execution (bool, optional):
+ If True, sends an execution started and stopped event if this
+ causes a query. Set to False if using read_gbq_table from
+ another function that is reporting execution.
"""
- import bigframes._tools.strings
+ import bigframes.core.events
import bigframes.dataframe as dataframe
# ---------------------------------
@@ -636,6 +647,7 @@ def read_gbq_table(
bq_time=self._clock.get_time(),
cache=self._df_snapshot,
use_cache=use_cache,
+ publisher=self._publisher,
)
if table.location.casefold() != self._storage_manager.location.casefold():
@@ -756,6 +768,7 @@ def read_gbq_table(
filter_str,
should_warn=True,
should_dry_run=True,
+ publisher=self._publisher,
)
# ----------------------------
@@ -768,12 +781,27 @@ def read_gbq_table(
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
# check.
primary_key = bf_read_gbq_table.infer_unique_columns(
- bqclient=self._bqclient,
table=table,
index_cols=index_cols,
- # If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique
- metadata_only=not self._scan_index_uniqueness,
)
+
+ # If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique
+ if not primary_key and self._scan_index_uniqueness and index_cols:
+ if publish_execution:
+ self._publisher.publish(
+ bigframes.core.events.ExecutionStarted(),
+ )
+ primary_key = bf_read_gbq_table.check_if_index_columns_are_unique(
+ self._bqclient,
+ table=table,
+ index_cols=index_cols,
+ publisher=self._publisher,
+ )
+ if publish_execution:
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(),
+ )
+
schema = schemata.ArraySchema.from_bq_table(table)
if not include_all_columns:
schema = schema.select(index_cols + columns)
@@ -991,6 +1019,12 @@ def read_gbq_query(
query_job, list(columns), index_cols
)
+ # We want to make sure we show progress when we actually do execute a
+ # query. Since we have got this far, we know it's not a dry run.
+ self._publisher.publish(
+ bigframes.core.events.ExecutionStarted(),
+ )
+
query_job_for_metrics: Optional[bigquery.QueryJob] = None
destination: Optional[bigquery.TableReference] = None
@@ -1046,20 +1080,28 @@ def read_gbq_query(
# makes sense to download the results beyond the first page, even if
# there is a job and destination table available.
if query_job_for_metrics is None and rows is not None:
- return bf_read_gbq_query.create_dataframe_from_row_iterator(
+ df = bf_read_gbq_query.create_dataframe_from_row_iterator(
rows,
session=self._session,
index_col=index_col,
columns=columns,
)
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(),
+ )
+ return df
# We already checked rows, so if there's no destination table, then
# there are no results to return.
if destination is None:
- return bf_read_gbq_query.create_dataframe_from_query_job_stats(
+ df = bf_read_gbq_query.create_dataframe_from_query_job_stats(
query_job_for_metrics,
session=self._session,
)
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(),
+ )
+ return df
# If the query was DDL or DML, return some job metadata. See
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
@@ -1070,10 +1112,14 @@ def read_gbq_query(
query_job_for_metrics is not None
and not bf_read_gbq_query.should_return_query_results(query_job_for_metrics)
):
- return bf_read_gbq_query.create_dataframe_from_query_job_stats(
+ df = bf_read_gbq_query.create_dataframe_from_query_job_stats(
query_job_for_metrics,
session=self._session,
)
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(),
+ )
+ return df
# Speed up counts by getting counts from result metadata.
if rows is not None:
@@ -1083,16 +1129,21 @@ def read_gbq_query(
else:
n_rows = None
- return self.read_gbq_table(
+ df = self.read_gbq_table(
f"{destination.project}.{destination.dataset_id}.{destination.table_id}",
index_col=index_col,
columns=columns,
use_cache=configuration["query"]["useQueryCache"],
force_total_order=force_total_order,
n_rows=n_rows,
+ publish_execution=False,
# max_results and filters are omitted because they are already
# handled by to_query(), above.
)
+ self._publisher.publish(
+ bigframes.core.events.ExecutionFinished(),
+ )
+ return df
def _query_to_destination(
self,
@@ -1194,6 +1245,7 @@ def _start_query_with_job_optional(
project=None,
metrics=None,
query_with_job=False,
+ publisher=self._publisher,
)
return rows
@@ -1219,6 +1271,7 @@ def _start_query_with_job(
project=None,
metrics=None,
query_with_job=True,
+ publisher=self._publisher,
)
return query_job
diff --git a/bigframes/testing/mocks.py b/bigframes/testing/mocks.py
index 8d9997b1df..ff210419fd 100644
--- a/bigframes/testing/mocks.py
+++ b/bigframes/testing/mocks.py
@@ -143,6 +143,7 @@ def query_and_wait_mock(query, *args, job_config=None, **kwargs):
bqclient.query.side_effect = query_mock
bqclient.query_and_wait.side_effect = query_and_wait_mock
+ bqclient._query_and_wait_bigframes.side_effect = query_and_wait_mock
clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider)
type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient)
diff --git a/setup.py b/setup.py
index 2aef514749..abc760b691 100644
--- a/setup.py
+++ b/setup.py
@@ -39,7 +39,7 @@
"gcsfs >=2023.3.0, !=2025.5.0",
"geopandas >=0.12.2",
"google-auth >=2.15.0,<3.0",
- "google-cloud-bigquery[bqstorage,pandas] >=3.31.0",
+ "google-cloud-bigquery[bqstorage,pandas] >=3.36.0",
# 2.30 needed for arrow support.
"google-cloud-bigquery-storage >= 2.30.0, < 3.0.0",
"google-cloud-functions >=1.12.0",
diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt
index 8df3a3a2c3..eceec07dc4 100644
--- a/testing/constraints-3.9.txt
+++ b/testing/constraints-3.9.txt
@@ -6,7 +6,7 @@ geopandas==0.12.2
google-auth==2.15.0
google-cloud-bigtable==2.24.0
google-cloud-pubsub==2.21.4
-google-cloud-bigquery==3.31.0
+google-cloud-bigquery==3.36.0
google-cloud-functions==1.12.0
google-cloud-bigquery-connection==1.12.0
google-cloud-iam==2.12.1
diff --git a/tests/system/small/engines/conftest.py b/tests/system/small/engines/conftest.py
index 9699cc6a61..a775731cde 100644
--- a/tests/system/small/engines/conftest.py
+++ b/tests/system/small/engines/conftest.py
@@ -19,7 +19,7 @@
import pytest
import bigframes
-from bigframes.core import ArrayValue, local_data
+from bigframes.core import ArrayValue, events, local_data
from bigframes.session import (
direct_gbq_execution,
local_scan_executor,
@@ -50,11 +50,14 @@ def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecu
return local_scan_executor.LocalScanExecutor()
if request.param == "polars":
return polars_executor.PolarsExecutor()
+ publisher = events.Publisher()
if request.param == "bq":
- return direct_gbq_execution.DirectGbqExecutor(bigquery_client)
+ return direct_gbq_execution.DirectGbqExecutor(
+ bigquery_client, publisher=publisher
+ )
if request.param == "bq-sqlglot":
return direct_gbq_execution.DirectGbqExecutor(
- bigquery_client, compiler="sqlglot"
+ bigquery_client, compiler="sqlglot", publisher=publisher
)
raise ValueError(f"Unrecognized param: {request.param}")
diff --git a/tests/system/small/engines/test_aggregation.py b/tests/system/small/engines/test_aggregation.py
index 9b4efe8cbe..4225d5dff7 100644
--- a/tests/system/small/engines/test_aggregation.py
+++ b/tests/system/small/engines/test_aggregation.py
@@ -15,7 +15,14 @@
from google.cloud import bigquery
import pytest
-from bigframes.core import agg_expressions, array_value, expression, identifiers, nodes
+from bigframes.core import (
+ agg_expressions,
+ array_value,
+ events,
+ expression,
+ identifiers,
+ nodes,
+)
import bigframes.operations.aggregations as agg_ops
from bigframes.session import direct_gbq_execution, polars_executor
from bigframes.testing.engine_utils import assert_equivalence_execution
@@ -93,9 +100,12 @@ def test_sql_engines_median_op_aggregates(
scalars_array_value,
agg_ops.MedianOp(),
).node
- left_engine = direct_gbq_execution.DirectGbqExecutor(bigquery_client)
+ publisher = events.Publisher()
+ left_engine = direct_gbq_execution.DirectGbqExecutor(
+ bigquery_client, publisher=publisher
+ )
right_engine = direct_gbq_execution.DirectGbqExecutor(
- bigquery_client, compiler="sqlglot"
+ bigquery_client, compiler="sqlglot", publisher=publisher
)
assert_equivalence_execution(node, left_engine, right_engine)
diff --git a/tests/system/small/engines/test_windowing.py b/tests/system/small/engines/test_windowing.py
index f344a3b60a..a34d7b8f38 100644
--- a/tests/system/small/engines/test_windowing.py
+++ b/tests/system/small/engines/test_windowing.py
@@ -18,6 +18,7 @@
from bigframes.core import (
agg_expressions,
array_value,
+ events,
expression,
identifiers,
nodes,
@@ -64,8 +65,11 @@ def test_engines_with_rows_window(
skip_reproject_unsafe=False,
)
- bq_executor = direct_gbq_execution.DirectGbqExecutor(bigquery_client)
+ publisher = events.Publisher()
+ bq_executor = direct_gbq_execution.DirectGbqExecutor(
+ bigquery_client, publisher=publisher
+ )
bq_sqlgot_executor = direct_gbq_execution.DirectGbqExecutor(
- bigquery_client, compiler="sqlglot"
+ bigquery_client, compiler="sqlglot", publisher=publisher
)
assert_equivalence_execution(window_node, bq_executor, bq_sqlgot_executor)
diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py
index 28fab19144..f81bdf8931 100644
--- a/tests/system/small/functions/test_remote_function.py
+++ b/tests/system/small/functions/test_remote_function.py
@@ -27,6 +27,7 @@
import bigframes
import bigframes.clients
+import bigframes.core.events
import bigframes.dtypes
import bigframes.exceptions
from bigframes.functions import _utils as bff_utils
@@ -769,6 +770,7 @@ def test_read_gbq_function_runs_existing_udf_array_output(session, routine_id_un
timeout=None,
metrics=None,
query_with_job=True,
+ publisher=bigframes.core.events.Publisher(),
)
func = session.read_gbq_function(routine_id_unique)
@@ -807,6 +809,7 @@ def test_read_gbq_function_runs_existing_udf_2_params_array_output(
timeout=None,
metrics=None,
query_with_job=True,
+ publisher=bigframes.core.events.Publisher(),
)
func = session.read_gbq_function(routine_id_unique)
@@ -847,6 +850,7 @@ def test_read_gbq_function_runs_existing_udf_4_params_array_output(
timeout=None,
metrics=None,
query_with_job=True,
+ publisher=bigframes.core.events.Publisher(),
)
func = session.read_gbq_function(routine_id_unique)
diff --git a/tests/system/small/test_bq_sessions.py b/tests/system/small/test_bq_sessions.py
index 7aad19bd8f..801346600d 100644
--- a/tests/system/small/test_bq_sessions.py
+++ b/tests/system/small/test_bq_sessions.py
@@ -17,10 +17,10 @@
import google
import google.api_core.exceptions
-import google.cloud
from google.cloud import bigquery
import pytest
+import bigframes.core.events
from bigframes.session import bigquery_session
TEST_SCHEMA = [
@@ -39,12 +39,14 @@
def session_resource_manager(
bigquery_client,
) -> bigquery_session.SessionResourceManager:
- return bigquery_session.SessionResourceManager(bigquery_client, "US")
+ return bigquery_session.SessionResourceManager(
+ bigquery_client, "US", publisher=bigframes.core.events.Publisher()
+ )
def test_bq_session_create_temp_table_clustered(bigquery_client: bigquery.Client):
session_resource_manager = bigquery_session.SessionResourceManager(
- bigquery_client, "US"
+ bigquery_client, "US", publisher=bigframes.core.events.Publisher()
)
cluster_cols = ["string field", "bool field"]
@@ -68,7 +70,7 @@ def test_bq_session_create_temp_table_clustered(bigquery_client: bigquery.Client
def test_bq_session_create_multi_temp_tables(bigquery_client: bigquery.Client):
session_resource_manager = bigquery_session.SessionResourceManager(
- bigquery_client, "US"
+ bigquery_client, "US", publisher=bigframes.core.events.Publisher()
)
def create_table():
diff --git a/tests/system/small/test_progress_bar.py b/tests/system/small/test_progress_bar.py
index 8a323831b5..0c9c4070f4 100644
--- a/tests/system/small/test_progress_bar.py
+++ b/tests/system/small/test_progress_bar.py
@@ -23,7 +23,7 @@
import bigframes.formatting_helpers as formatting_helpers
from bigframes.session import MAX_INLINE_DF_BYTES
-job_load_message_regex = r"\w+ job [\w-]+ is \w+\."
+job_load_message_regex = r"Query"
EXPECTED_DRY_RUN_MESSAGE = "Computation deferred. Computation will process"
@@ -56,7 +56,7 @@ def test_progress_bar_scalar(penguins_df_default_index: bf.dataframe.DataFrame,
with bf.option_context("display.progress_bar", "terminal"):
penguins_df_default_index["body_mass_g"].head(10).mean()
- assert capsys.readouterr().out == ""
+ assert_loading_msg_exist(capsys.readouterr().out)
def test_progress_bar_scalar_allow_large_results(
@@ -100,37 +100,19 @@ def test_progress_bar_load_jobs(
capsys.readouterr() # clear output
session.read_csv(path)
- assert_loading_msg_exist(capsys.readouterr().out)
+ assert_loading_msg_exist(capsys.readouterr().out, pattern="Load")
-def assert_loading_msg_exist(capystOut: str, pattern=job_load_message_regex):
- numLoadingMsg = 0
- lines = capystOut.split("\n")
+def assert_loading_msg_exist(capstdout: str, pattern=job_load_message_regex):
+ num_loading_msg = 0
+ lines = capstdout.split("\n")
lines = [line for line in lines if len(line) > 0]
assert len(lines) > 0
for line in lines:
- if re.match(pattern, line) is not None:
- numLoadingMsg += 1
- assert numLoadingMsg > 0
-
-
-def test_query_job_repr_html(penguins_df_default_index: bf.dataframe.DataFrame):
- with bf.option_context("display.progress_bar", "terminal"):
- penguins_df_default_index.to_pandas(allow_large_results=True)
- query_job_repr = formatting_helpers.repr_query_job_html(
- penguins_df_default_index.query_job
- ).value
-
- string_checks = [
- "Job Id",
- "Destination Table",
- "Slot Time",
- "Bytes Processed",
- "Cache hit",
- ]
- for string in string_checks:
- assert string in query_job_repr
+ if re.search(pattern, line) is not None:
+ num_loading_msg += 1
+ assert num_loading_msg > 0
def test_query_job_repr(penguins_df_default_index: bf.dataframe.DataFrame):
diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py
index c451d74d0f..57ac3d88f7 100644
--- a/tests/unit/session/test_io_bigquery.py
+++ b/tests/unit/session/test_io_bigquery.py
@@ -22,6 +22,7 @@
import bigframes
from bigframes.core import log_adapter
+import bigframes.core.events
import bigframes.pandas as bpd
import bigframes.session._io.bigquery as io_bq
from bigframes.testing import mocks
@@ -236,6 +237,7 @@ def test_start_query_with_client_labels_length_limit_met(
timeout=timeout,
metrics=None,
query_with_job=True,
+ publisher=bigframes.core.events.Publisher(),
)
assert job_config.labels is not None
diff --git a/tests/unit/session/test_read_gbq_table.py b/tests/unit/session/test_read_gbq_table.py
index 0c67e05813..d21f0000a9 100644
--- a/tests/unit/session/test_read_gbq_table.py
+++ b/tests/unit/session/test_read_gbq_table.py
@@ -24,13 +24,12 @@
@pytest.mark.parametrize(
- ("index_cols", "primary_keys", "values_distinct", "expected"),
+ ("index_cols", "primary_keys", "expected"),
(
- (["col1", "col2"], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")),
+ (["col1", "col2"], ["col1", "col2", "col3"], ("col1", "col2", "col3")),
(
["col1", "col2", "col3"],
["col1", "col2", "col3"],
- True,
("col1", "col2", "col3"),
),
(
@@ -39,15 +38,14 @@
"col3",
"col2",
],
- True,
("col2", "col3"),
),
- (["col1", "col2"], [], False, ()),
- ([], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")),
- ([], [], False, ()),
+ (["col1", "col2"], [], ()),
+ ([], ["col1", "col2", "col3"], ("col1", "col2", "col3")),
+ ([], [], ()),
),
)
-def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expected):
+def test_infer_unique_columns(index_cols, primary_keys, expected):
"""If a primary key is set on the table, we use that as the index column
by default, no error should be raised in this case.
@@ -79,6 +77,49 @@ def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expecte
"columns": primary_keys,
},
}
+
+ result = bf_read_gbq_table.infer_unique_columns(table, index_cols)
+
+ assert result == expected
+
+
+@pytest.mark.parametrize(
+ ("index_cols", "values_distinct", "expected"),
+ (
+ (
+ ["col1", "col2", "col3"],
+ True,
+ ("col1", "col2", "col3"),
+ ),
+ (
+ ["col2", "col3", "col1"],
+ True,
+ ("col2", "col3", "col1"),
+ ),
+ (["col1", "col2"], False, ()),
+ ([], False, ()),
+ ),
+)
+def test_check_if_index_columns_are_unique(index_cols, values_distinct, expected):
+ table = google.cloud.bigquery.Table.from_api_repr(
+ {
+ "tableReference": {
+ "projectId": "my-project",
+ "datasetId": "my_dataset",
+ "tableId": "my_table",
+ },
+ "clustering": {
+ "fields": ["col1", "col2"],
+ },
+ },
+ )
+ table.schema = (
+ google.cloud.bigquery.SchemaField("col1", "INT64"),
+ google.cloud.bigquery.SchemaField("col2", "INT64"),
+ google.cloud.bigquery.SchemaField("col3", "INT64"),
+ google.cloud.bigquery.SchemaField("col4", "INT64"),
+ )
+
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
bqclient.project = "test-project"
session = mocks.create_bigquery_session(
@@ -87,13 +128,18 @@ def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expecte
# Mock bqclient _after_ creating session to override its mocks.
bqclient.get_table.return_value = table
- bqclient.query_and_wait.side_effect = None
- bqclient.query_and_wait.return_value = (
+ bqclient._query_and_wait_bigframes.side_effect = None
+ bqclient._query_and_wait_bigframes.return_value = (
{"total_count": 3, "distinct_count": 3 if values_distinct else 2},
)
table._properties["location"] = session._location
- result = bf_read_gbq_table.infer_unique_columns(bqclient, table, index_cols)
+ result = bf_read_gbq_table.check_if_index_columns_are_unique(
+ bqclient=bqclient,
+ table=table,
+ index_cols=index_cols,
+ publisher=session._publisher,
+ )
assert result == expected
diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py
index 63c82eb30f..d05957b941 100644
--- a/tests/unit/session/test_session.py
+++ b/tests/unit/session/test_session.py
@@ -247,7 +247,7 @@ def test_read_gbq_cached_table():
table,
)
- session.bqclient.query_and_wait = mock.MagicMock(
+ session.bqclient._query_and_wait_bigframes = mock.MagicMock(
return_value=({"total_count": 3, "distinct_count": 2},)
)
session.bqclient.get_table.return_value = table
@@ -278,7 +278,7 @@ def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_inclu
table,
)
- session.bqclient.query_and_wait = mock.MagicMock(
+ session.bqclient._query_and_wait_bigframes = mock.MagicMock(
return_value=({"total_count": 3, "distinct_count": 2},)
)
session.bqclient.get_table.return_value = table
@@ -306,7 +306,9 @@ def test_default_index_warning_raised_by_read_gbq(table):
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
bqclient.project = "test-project"
bqclient.get_table.return_value = table
- bqclient.query_and_wait.return_value = ({"total_count": 3, "distinct_count": 2},)
+ bqclient._query_and_wait_bigframes.return_value = (
+ {"total_count": 3, "distinct_count": 2},
+ )
session = mocks.create_bigquery_session(
bqclient=bqclient,
# DefaultIndexWarning is only relevant for strict mode.
@@ -333,7 +335,9 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
bqclient.project = "test-project"
bqclient.get_table.return_value = table
- bqclient.query_and_wait.return_value = ({"total_count": 4, "distinct_count": 3},)
+ bqclient._query_and_wait_bigframes.return_value = (
+ {"total_count": 4, "distinct_count": 3},
+ )
session = mocks.create_bigquery_session(
bqclient=bqclient,
# DefaultIndexWarning is only relevant for strict mode.
@@ -382,7 +386,7 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_columns(
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
bqclient.project = "test-project"
bqclient.get_table.return_value = table
- bqclient.query_and_wait.return_value = (
+ bqclient._query_and_wait_bigframes.return_value = (
{"total_count": total_count, "distinct_count": distinct_count},
)
session = mocks.create_bigquery_session(
@@ -492,6 +496,7 @@ def query_mock(query, *args, **kwargs):
return session_query_mock(query, *args, **kwargs)
session.bqclient.query_and_wait = query_mock
+ session.bqclient._query_and_wait_bigframes = query_mock
def get_table_mock(table_ref):
table = google.cloud.bigquery.Table(
diff --git a/tests/unit/test_formatting_helpers.py b/tests/unit/test_formatting_helpers.py
index 588ef6e824..9dc1379496 100644
--- a/tests/unit/test_formatting_helpers.py
+++ b/tests/unit/test_formatting_helpers.py
@@ -19,6 +19,7 @@
import google.cloud.bigquery as bigquery
import pytest
+import bigframes.core.events as bfevents
import bigframes.formatting_helpers as formatting_helpers
import bigframes.version
@@ -30,7 +31,7 @@ def test_wait_for_query_job_error_includes_feedback_link():
)
with pytest.raises(api_core_exceptions.BadRequest) as cap_exc:
- formatting_helpers.wait_for_query_job(mock_query_job)
+ formatting_helpers.wait_for_job(mock_query_job)
cap_exc.match("Test message 123.")
cap_exc.match(constants.FEEDBACK_LINK)
@@ -70,3 +71,129 @@ def test_get_formatted_bytes(test_input, expected):
)
def test_get_formatted_time(test_input, expected):
assert formatting_helpers.get_formatted_time(test_input) == expected
+
+
+def test_render_bqquery_sent_event_html():
+ event = bfevents.BigQuerySentEvent(
+ query="SELECT * FROM my_table",
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ )
+ html = formatting_helpers.render_bqquery_sent_event_html(event)
+ assert "SELECT * FROM my_table" in html
+ assert "my-job-id" in html
+ assert "us-central1" in html
+ assert "my-project" in html
+ assert "" in html
+
+
+def test_render_bqquery_sent_event_plaintext():
+ event = bfevents.BigQuerySentEvent(
+ query="SELECT * FROM my_table",
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ )
+ text = formatting_helpers.render_bqquery_sent_event_plaintext(event)
+ assert "my-job-id" in text
+ assert "us-central1" in text
+ assert "my-project" in text
+ assert "SELECT * FROM my_table" not in text
+
+
+def test_render_bqquery_retry_event_html():
+ event = bfevents.BigQueryRetryEvent(
+ query="SELECT * FROM my_table",
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ )
+ html = formatting_helpers.render_bqquery_retry_event_html(event)
+ assert "Retrying query" in html
+ assert "SELECT * FROM my_table" in html
+ assert "my-job-id" in html
+ assert "us-central1" in html
+ assert "my-project" in html
+ assert "" in html
+
+
+def test_render_bqquery_retry_event_plaintext():
+ event = bfevents.BigQueryRetryEvent(
+ query="SELECT * FROM my_table",
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ )
+ text = formatting_helpers.render_bqquery_retry_event_plaintext(event)
+ assert "Retrying query" in text
+ assert "my-job-id" in text
+ assert "us-central1" in text
+ assert "my-project" in text
+ assert "SELECT * FROM my_table" not in text
+
+
+def test_render_bqquery_received_event_html():
+ mock_plan_entry = mock.create_autospec(
+ bigquery.job.query.QueryPlanEntry, instance=True
+ )
+ mock_plan_entry.__str__.return_value = "mocked plan"
+ event = bfevents.BigQueryReceivedEvent(
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ state="RUNNING",
+ query_plan=[mock_plan_entry],
+ )
+ html = formatting_helpers.render_bqquery_received_event_html(event)
+ assert "Query" in html
+ assert "my-job-id" in html
+ assert "is RUNNING" in html
+ assert "" in html
+ assert "mocked plan" in html
+
+
+def test_render_bqquery_received_event_plaintext():
+ event = bfevents.BigQueryReceivedEvent(
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ state="RUNNING",
+ query_plan=[],
+ )
+ text = formatting_helpers.render_bqquery_received_event_plaintext(event)
+ assert "Query" in text
+ assert "my-job-id" in text
+ assert "is RUNNING" in text
+ assert "Query Plan" not in text
+
+
+def test_render_bqquery_finished_event_html():
+ event = bfevents.BigQueryFinishedEvent(
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ total_bytes_processed=1000,
+ slot_millis=2000,
+ )
+ html = formatting_helpers.render_bqquery_finished_event_html(event)
+ assert "Query" in html
+ assert "my-job-id" in html
+ assert "processed 1.0 kB" in html
+ assert "2 seconds of slot time" in html
+
+
+def test_render_bqquery_finished_event_plaintext():
+ event = bfevents.BigQueryFinishedEvent(
+ job_id="my-job-id",
+ location="us-central1",
+ billing_project="my-project",
+ total_bytes_processed=1000,
+ slot_millis=2000,
+ )
+ text = formatting_helpers.render_bqquery_finished_event_plaintext(event)
+ assert "Query" in text
+ assert "my-job-id" in text
+ assert "finished" in text
+ assert "1.0 kB processed" in text
+ assert "Slot time: 2 seconds" in text