-
Notifications
You must be signed in to change notification settings - Fork 63
perf: Fall back to ordering by bq pk when possible #1350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
c5c06dc
perf: Fall back to ordering by bq pk when possible
TrevorBergeron f16233b
use pk before index, fix unit test
TrevorBergeron 988ba64
Apply suggestions from code review
tswast 56b0adc
Merge branch 'main' into use_pk_always
tswast 87cf620
Update bigframes/session/_io/bigquery/read_gbq_table.py
tswast b977e51
fix null index case
TrevorBergeron File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -152,24 +152,28 @@ def validate_table( | |
| return False | ||
|
|
||
|
|
||
| def are_index_cols_unique( | ||
| def infer_unique_columns( | ||
| bqclient: bigquery.Client, | ||
| table: bigquery.table.Table, | ||
| index_cols: List[str], | ||
| api_name: str, | ||
| metadata_only: bool = False, | ||
| ) -> bool: | ||
| if len(index_cols) == 0: | ||
| return False | ||
| ) -> Tuple[str, ...]: | ||
| """Return a set of columns that can provide a unique row key or empty if none can be inferred. | ||
|
|
||
| Note: primary keys are not enforced, but these are assumed to be unique | ||
| by the query engine, so we make the same assumption here. | ||
| """ | ||
| # If index_cols contain the primary_keys, the query engine assumes they are | ||
| # provide a unique index. | ||
| primary_keys = frozenset(_get_primary_keys(table)) | ||
| if (len(primary_keys) > 0) and primary_keys <= frozenset(index_cols): | ||
| return True | ||
| primary_keys = tuple(_get_primary_keys(table)) | ||
| if (len(primary_keys) > 0) and frozenset(primary_keys) <= frozenset(index_cols): | ||
| # Essentially, just reordering the primary key to match the index col order | ||
| return tuple(index_col for index_col in index_cols if index_col in primary_keys) | ||
|
|
||
| if metadata_only: | ||
| if primary_keys or metadata_only or (not index_cols): | ||
| # Sometimes not worth scanning data to check uniqueness | ||
| return False | ||
| return primary_keys | ||
| # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring | ||
| # table_expression only selects just index_cols. | ||
| is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference) | ||
|
|
@@ -178,7 +182,9 @@ def are_index_cols_unique( | |
| results = bqclient.query_and_wait(is_unique_sql, job_config=job_config) | ||
| row = next(iter(results)) | ||
|
|
||
| return row["total_count"] == row["distinct_count"] | ||
| if row["total_count"] == row["distinct_count"]: | ||
| return tuple(index_cols) | ||
| return () | ||
|
|
||
|
|
||
| def _get_primary_keys( | ||
|
|
@@ -279,54 +285,3 @@ def get_index_cols( | |
| index_cols = primary_keys | ||
|
|
||
| return index_cols | ||
|
|
||
|
|
||
| def get_time_travel_datetime_and_table_metadata( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dead code?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, dead and now, gone |
||
| bqclient: bigquery.Client, | ||
| table_ref: bigquery.TableReference, | ||
| *, | ||
| api_name: str, | ||
| cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], | ||
| use_cache: bool = True, | ||
| ) -> Tuple[datetime.datetime, bigquery.Table]: | ||
| cached_table = cache.get(table_ref) | ||
| if use_cache and cached_table is not None: | ||
| snapshot_timestamp, _ = cached_table | ||
|
|
||
| # Cache hit could be unexpected. See internal issue 329545805. | ||
| # Raise a warning with more information about how to avoid the | ||
| # problems with the cache. | ||
| msg = ( | ||
| f"Reading cached table from {snapshot_timestamp} to avoid " | ||
| "incompatibilies with previous reads of this table. To read " | ||
| "the latest version, set `use_cache=False` or close the " | ||
| "current session with Session.close() or " | ||
| "bigframes.pandas.close_session()." | ||
| ) | ||
| # There are many layers before we get to (possibly) the user's code: | ||
| # pandas.read_gbq_table | ||
| # -> with_default_session | ||
| # -> Session.read_gbq_table | ||
| # -> _read_gbq_table | ||
| # -> _get_snapshot_sql_and_primary_key | ||
| # -> get_snapshot_datetime_and_table_metadata | ||
| warnings.warn(msg, stacklevel=7) | ||
| return cached_table | ||
|
|
||
| # TODO(swast): It's possible that the table metadata is changed between now | ||
| # and when we run the CURRENT_TIMESTAMP() query to see when we can time | ||
| # travel to. Find a way to fetch the table metadata and BQ's current time | ||
| # atomically. | ||
| table = bqclient.get_table(table_ref) | ||
|
|
||
| job_config = bigquery.QueryJobConfig() | ||
| job_config.labels["bigframes-api"] = api_name | ||
| snapshot_timestamp = list( | ||
| bqclient.query( | ||
| "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", | ||
| job_config=job_config, | ||
| ).result() | ||
| )[0][0] | ||
| cached_table = (snapshot_timestamp, table) | ||
| cache[table_ref] = cached_table | ||
| return cached_table | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.