8000 fix: `read_gbq_table` respects primary keys even when `filters` are set by tswast · Pull Request #689 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.1.1
rev: v1.10.0
hooks:
- id: mypy
additional_dependencies: [types-requests, types-tabulate, pandas-stubs]
args: ["--check-untyped-defs", "--explicit-package-bases", '--exclude="^third_party"', "--ignore-missing-imports"]
4 changes: 2 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ def __init__(

# If no index columns are set, create one.
#
# Note: get_index_cols_and_uniqueness in
# Note: get_index_cols in
# bigframes/session/_io/bigquery/read_gbq_table.py depends on this
# being as sequential integer index column. If this default behavior
# ever changes, please also update get_index_cols_and_uniqueness so
# ever changes, please also update get_index_cols so
# that users who explicitly request a sequential integer index can
# still get one.
if len(index_columns) == 0:
Expand Down
8 changes: 3 additions & 5 deletions bigframes/core/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


### Writing SQL Values (literals, column references, table references, etc.)
def simple_literal(value: str | int | bool | float):
def simple_literal(value: str | int | bool | float | datetime.datetime):
"""Return quoted input string."""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#literals
if isinstance(value, str):
Expand All @@ -50,6 +50,8 @@ def simple_literal(value: str | int | bool | float):
if value == -math.inf:
return 'CAST("-inf" as FLOAT)'
return str(value)
if isinstance(value, datetime.datetime):
return f"TIMESTAMP('{value.isoformat()}')"
else:
raise ValueError(f"Cannot produce literal for {value}")

Expand Down Expand Up @@ -156,7 +158,3 @@ def ordering_clause(
part = f"`{ordering_expr.id}` {asc_desc} {null_clause}"
parts.append(part)
return f"ORDER BY {' ,'.join(parts)}"


def snapshot_clause(time_travel_timestamp: datetime.datetime):
return f"FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())})"
8000
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def read_gbq_query(
max_results: Optional[int] = None,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
filters: vendored_pandas_gbq.FiltersType = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query)
return global_session.with_default_session(
Expand All @@ -560,6 +561,7 @@ def read_gbq_query(
max_results=max_results,
use_cache=use_cache,
col_order=col_order,
filters=filters,
)


Expand Down
177 changes: 111 additions & 66 deletions bigframes/session/__init__.py
@@ -361,6 +349,7 @@ def read_gbq(
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import ibis
import ibis.backends.bigquery as ibis_bigquery
import ibis.expr.types as ibis_types
import jellyfish
import numpy as np
import pandas
from pandas._typing import (
Expand Down Expand Up @@ -339,19 +340,6 @@ def read_gbq(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(
query_or_table
):
# TODO(b/338111344): This appears to be missing index_cols, which
# are necessary to be selected.
# TODO(b/338039517): Refactor this to be called inside both
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
# so we can make sure index_col/index_cols reflects primary keys.
query_or_table = bf_io_bigquery.to_query(
query_or_table, _to_index_cols(index_col), columns, filters
)

if bf_io_bigquery.is_query(query_or_table):
return self._read_gbq_query(
query_or_table,
Expand All
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache,
filters=filters,
)
else:
if configuration is not None:
Expand All @@ -377,6 +366,7 @@ def read_gbq(
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache if use_cache is not None else True,
filters=filters,
)

def _query_to_destination(
Expand Down Expand Up @@ -451,6 +441,7 @@ def read_gbq_query(
max_results: Optional[int] = None,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
filters: third_party_pandas_gbq.FiltersType = (),
) -> dataframe.DataFrame:
"""Turn a SQL query into a DataFrame.

Expand Down Expand Up @@ -517,6 +508,7 @@ def read_gbq_query(
max_results=max_results,
api_name="read_gbq_query",
use_cache=use_cache,
filters=filters,
)

def _read_gbq_query(
Expand All @@ -529,6 +521,7 @@ def _read_gbq_query(
max_results: Optional[int] = None,
api_name: str = "read_gbq_query",
use_cache: Optional[bool] = None,
filters: third_party_pandas_gbq.FiltersType = (),
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

Expand Down Expand Up @@ -557,6 +550,21 @@ def _read_gbq_query(

index_cols = _to_index_cols(index_col)

filters = list(filters)
if len(filters) != 0 or max_results is not None:
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
query = bf_io_bigquery.to_query(
query,
index_cols,
columns,
filters,
max_results=max_results,
# We're executing the query, so we don't need time travel for
# determinism.
time_travel_timestamp=None,
)

destination, query_job = self._query_to_destination(
query,
index_cols,
Expand All @@ -580,12 +588,14 @@ def _read_gbq_query(
session=self,
)

return self.read_gbq_table(
return self._read_gbq_table(
f"{destination.project}.{destination.dataset_id}.{destination.table_id}",
index_col=index_col,
columns=columns,
max_results=max_results,
use_cache=configuration["query"]["useQueryCache"],
api_name=api_name,
# max_results and filters are omitted because they are already
# handled by to_query(), above.
)

def read_gbq_table(
Expand Down Expand Up @@ -621,31 +631,14 @@ def read_gbq_table(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(query):
# TODO(b/338039517): Refactor this to be called inside both
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
# so we can make sure index_col/index_cols reflects primary keys.
query = bf_io_bigquery.to_query(
query, _to_index_cols(index_col), columns, filters
)

return self._read_gbq_query(
query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
)

return self._read_gbq_table(
query=query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
filters=filters,
)

def _read_gbq_table(
Expand All @@ -657,6 +650,7 @@ def _read_gbq_table(
max_results: Optional[int] = None,
api_name: str,
use_cache: bool = True,
filters: third_party_pandas_gbq.FiltersType = (),
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

Expand All @@ -673,6 +667,9 @@ def _read_gbq_table(
query, default_project=self.bqclient.project
)

columns = list(columns)
filters = list(filters)

# ---------------------------------
# Fetch table metadata and validate
# ---------------------------------
Expand All @@ -684,62 +681,110 @@ def _read_gbq_table(
cache=self._df_snapshot,
use_cache=use_cache,
)
table_column_names = {field.name for field in table.schema}

if table.location.casefold() != self._location.casefold():
raise ValueError(
f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}"
)

# -----------------------------------------
# Create Ibis table expression and validate
# -----------------------------------------

# Use a time travel to make sure the DataFrame is deterministic, even
# if the underlying table changes.
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
self.ibis_client,
table_ref,
time_travel_timestamp,
)

for key in columns:
if key not in table_expression.columns:
if key not in table_column_names:
possibility = min(
table_column_names,
key=lambda item: jellyfish.levenshtein_distance(key, item),
)
raise ValueError(
f"Column '{key}' of `columns` not found in this table."
f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?"
)

# ---------------------------------------
# Create a non-default index and validate
# ---------------------------------------

# TODO(b/337925142): Move index_cols creation to before we create the
# Ibis table expression so we don't have a "SELECT *" subquery in the
# query that checks for index uniqueness.

index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness(
bqclient=self.bqclient,
ibis_client=self.ibis_client,
# Converting index_col into a list of column names requires
# the table metadata because we might use the primary keys
# when constructing the index.
index_cols = bf_read_gbq_table.get_index_cols(
table=table,
table_expression=table_expression,
index_col=index_col,
api_name=api_name,
)

for key in index_cols:
if key not in table_expression.columns:
if key not in table_column_names:
possibility = min(
table_column_names,
key=lambda item: jellyfish.levenshtein_distance(key, item),
)
raise ValueError(
f"Column `{key}` of `index_col` not found in this table."
f"Column '{key}' of `index_col` not found in this table. Did you mean '{possibility}'?"
)

# TODO(b/337925142): We should push down column filters when we get the time
# travel table to avoid "SELECT *" subqueries.
if columns:
table_expression = table_expression.select([*index_cols, *columns])
# -----------------------------
# Optionally, execute the query
# -----------------------------

# max_results introduces non-determinism and limits the cost on
# clustered tables, so fallback to a query. We do this here so that
# the index is consistent with tables that have primary keys, even
# when max_results is set.
# TODO(b/338419730): We don't need to fallback to a query for wildcard
# tables if we allow some non-determinism when time travel isn't supported.
if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix(
query
):
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
query = bf_io_bigquery.to_query(
query,
index_cols=index_cols,
columns=columns,
filters=filters,
max_results=max_results,
# We're executing the query, so we don't need time travel for
# determinism.
time_travel_timestamp=None,
)

return self._read_gbq_query(
query,
index_col=index_cols,
columns=columns,
api_name="read_gbq_table",
use_cache=use_cache,
)

# -----------------------------------------
# Create Ibis table expression and validate
# -----------------------------------------

# Use a time travel to make sure the DataFrame is deterministic, even
# if the underlying table changes.
# TODO(b/340540991): If a dry run query fails with time travel but
# succeeds without it, omit the time travel clause and raise a warning
# about potential non-determinism if the underlying tables are modified.
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
ibis_client=self.ibis_client,
table_ref=table_ref,
index_cols=index_cols,
columns=columns,
filters=filters,
time_travel_timestamp=time_travel_timestamp,
)

# ----------------------------
# Create ordering and validate
# ----------------------------

# TODO(b/337925142): Generate a new subquery with just the index_cols
# in the Ibis table expression so we don't have a "SELECT *" subquery
# in the query that checks for index uniqueness.
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
# check.
is_index_unique = bf_read_gbq_table.are_index_cols_unique(
bqclient=self.bqclient,
ibis_client=self.ibis_client,
table=table,
index_cols=index_cols,
api_name=api_name,
)

if is_index_unique:
array_value = bf_read_gbq_table.to_array_value_with_total_ordering(
session=self,
Expand Down
Loading
0