8000 perf: Fall back to ordering by bq pk when possible by TrevorBergeron · Pull Request #1350 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 16 additions & 61 deletions bigframes/session/_io/bigquery/read_gbq_table.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,28 @@ def validate_table(
return False


def are_index_cols_unique(
def infer_unique_columns(
bqclient: bigquery.Client,
table: bigquery.table.Table,
index_cols: List[str],
api_name: str,
metadata_only: bool = False,
) -> bool:
if len(index_cols) == 0:
return False
) -> Tuple[str, ...]:
"""Return a set of columns that can provide a unique row key or empty if none can be inferred.

Note: primary keys are not enforced, but these are assumed to be unique
by the query engine, so we make the same assumption here.
"""
# If index_cols contain the primary_keys, the query engine assumes they are
# provide a unique index.
primary_keys = frozenset(_get_primary_keys(table))
if (len(primary_keys) > 0) and primary_keys <= frozenset(index_cols):
return True
primary_keys = tuple(_get_primary_keys(table))
if (len(primary_keys) > 0) and frozenset(primary_keys) <= frozenset(index_cols):
# Essentially, just reordering the primary key to match the index col order
return tuple(index_col for index_col in index_cols if index_col in primary_keys)

if metadata_only:
if primary_keys or metadata_only or (not index_cols):
# Sometimes not worth scanning data to check uniqueness
return False
return primary_keys
# TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring
# table_expression only selects just index_cols.
is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference)
Expand All @@ -178,7 +182,9 @@ def are_index_cols_unique(
results = bqclient.query_and_wait(is_unique_sql, job_config=job_config)
row = next(iter(results))

return row["total_count"] == row["distinct_count"]
if row["total_count"] == row["distinct_count"]:
return tuple(index_cols)
return ()


def _get_primary_keys(
Expand Down Expand Up @@ -279,54 +285,3 @@ def get_index_cols(
index_cols = primary_keys

return index_cols


def get_time_travel_datetime_and_table_metadata(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, dead and now, gone

bqclient: bigquery.Client,
table_ref: bigquery.TableReference,
*,
api_name: str,
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
use_cache: bool = True,
) -> Tuple[datetime.datetime, bigquery.Table]:
cached_table = cache.get(table_ref)
if use_cache and cached_table is not None:
snapshot_timestamp, _ = cached_table

# Cache hit could be unexpected. See internal issue 329545805.
# Raise a warning with more information about how to avoid the
# problems with the cache.
msg = (
f"Reading cached table from {snapshot_timestamp} to avoid "
"incompatibilies with previous reads of this table. To read "
"the latest version, set `use_cache=False` or close the "
"current session with Session.close() or "
"bigframes.pandas.close_session()."
)
# There are many layers before we get to (possibly) the user's code:
# pandas.read_gbq_table
# -> with_default_session
# -> Session.read_gbq_table
# -> _read_gbq_table
# -> _get_snapshot_sql_and_primary_key
# -> get_snapshot_datetime_and_table_metadata
warnings.warn(msg, stacklevel=7)
return cached_table

# TODO(swast): It's possible that the table metadata is changed between now
# and when we run the CURRENT_TIMESTAMP() query to see when we can time
# travel to. Find a way to fetch the table metadata and BQ's current time
# atomically.
table = bqclient.get_table(table_ref)

job_config = bigquery.QueryJobConfig()
job_config.labels["bigframes-api"] = api_name
snapshot_timestamp = list(
bqclient.query(
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
job_config=job_config,
).result()
)[0][0]
cached_table = (snapshot_timestamp, table)
cache[table_ref] = cached_table
return cached_table
6 changes: 3 additions & 3 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def read_gbq_table(
# in the query that checks for index uniqueness.
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
# check.
is_index_unique = bf_read_gbq_table.are_index_cols_unique(
primary_key = bf_read_gbq_table.infer_unique_columns(
bqclient=self._bqclient,
table=table,
index_cols=index_cols,
Expand All @@ -440,12 +440,12 @@ def read_gbq_table(
schema=schema,
predicate=filter_str,
at_time=time_travel_timestamp if enable_snapshot else None,
primary_key=index_cols if is_index_unique else (),
primary_key=primary_key,
session=self._session,
)
# if we don't have a unique index, we order by row hash if we are in strict mode
if self._force_total_order:
if not is_index_unique:
if not primary_key:
array_value = array_value.order_by(
[
bigframes.core.ordering.OrderingExpression(
Expand Down
21 changes: 13 additions & 8 deletions tests/unit/session/test_read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,28 @@
@pytest.mark.parametrize(
("index_cols", "primary_keys", "values_distinct", "expected"),
(
(["col1", "col2"], ["col1", "col2", "col3"], False, False),
(["col1", "col2", "col3"], ["col1", "col2", "col3"], True, True),
(["col1", "col2"], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")),
(
["col1", "col2", "col3"],
["col1", "col2", "col3"],
True,
("col1", "col2", "col3"),
),
(
["col2", "col3", "col1"],
[
"col3",
"col2",
],
True,
True,
("col2", "col3"),
),
(["col1", "col2"], [], False, False),
([], ["col1", "col2", "col3"], False, False),
([], [], False, False),
(["col1", "col2"], [], False, ()),
([], ["col1", "col2", "col3"], False, ("col1", "col2", "col3")),
([], [], False, ()),
),
)
def test_are_index_cols_unique(index_cols, primary_keys, values_distinct, expected):
def test_infer_unique_columns(index_cols, primary_keys, values_distinct, expected):
"""If a primary key is set on the table, we use that as the index column
by default, no error should be raised in this case.

Expand Down Expand Up @@ -87,6 +92,6 @@ def test_are_index_cols_unique(index_cols, primary_keys, values_distinct, expect
)
table._properties["location"] = session._location

result = bf_read_gbq_table.are_index_cols_unique(bqclient, table, index_cols, "")
result = bf_read_gbq_table.infer_unique_columns(bqclient, table, index_cols, "")

assert result == expected
6 changes: 3 additions & 3 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ def test_read_gbq_cached_table():
table,
)

session.bqclient.get_table.return_value = table
session.bqclient.query_and_wait.return_value = (
{"total_count": 3, "distinct_count": 2},
session.bqclient.query_and_wait = mock.MagicMock(
return_value=({"total_count": 3, "distinct_count": 2},)
)
session.bqclient.get_table.return_value = table

with pytest.warns(UserWarning, match=re.escape("use_cache=False")):
df = session.read_gbq("my-project.my_dataset.my_table")
Expand Down
0