From 6377c51d4106642b6bb631a320853f96c5b07cec Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Fri, 9 Feb 2024 22:54:36 +0000 Subject: [PATCH 1/6] feat: support read_gbq wildcard table path --- bigframes/session/__init__.py | 58 ++++++++++++------- tests/system/small/test_session.py | 26 +++++++++ tests/unit/session/test_session.py | 27 ++++++++- .../bigframes_vendored/pandas/io/gbq.py | 11 +++- 4 files changed, 100 insertions(+), 22 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 15d4b3577b..396af39843 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -30,6 +30,7 @@ Iterable, List, Literal, + Mapping, MutableSequence, Optional, Sequence, @@ -115,6 +116,11 @@ def _is_query(query_or_table: str) -> bool: return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None +def _is_table_with_wildcard_suffix(query_or_table: str) -> bool: + """Determine if `query_or_table` is a table and contains a wildcard suffix.""" + return not _is_query(query_or_table) and query_or_table.endswith("*") + + class Session( third_party_pandas_gbq.GBQIOMixin, third_party_pandas_parquet.ParquetIOMixin, @@ -248,7 +254,9 @@ def read_gbq( elif col_order: columns = col_order - query_or_table = self._filters_to_query(query_or_table, columns, filters) + filters = list(filters) + if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table): + query_or_table = self._to_query(query_or_table, columns, filters) if _is_query(query_or_table): return self._read_gbq_query( @@ -272,13 +280,18 @@ def read_gbq( use_cache=use_cache, ) - def _filters_to_query(self, query_or_table, columns, filters): - """Convert filters to query""" - if len(filters) == 0: - return query_or_table - + def _to_query( + self, + query_or_table: str, + columns: Iterable[str], + filters: third_party_pandas_gbq.FiltersType, + ) -> str: + """Compile query_or_table with conditions(filters, wildcards) to query.""" + filters = list(filters) sub_query = ( - f"({query_or_table})" if _is_query(query_or_table) else query_or_table + f"({query_or_table})" + if _is_query(query_or_table) + else f"`{query_or_table}`" ) select_clause = "SELECT " + ( @@ -287,7 +300,7 @@ def _filters_to_query(self, query_or_table, columns, filters): where_clause = "" if filters: - valid_operators = { + valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = { "in": "IN", "not in": "NOT IN", "==": "=", @@ -298,19 +311,10 @@ def _filters_to_query(self, query_or_table, columns, filters): "!=": "!=", } - if ( - isinstance(filters, Iterable) - and isinstance(filters[0], Tuple) - and (len(filters[0]) == 0 or not isinstance(filters[0][0], Tuple)) - ): - filters = [filters] - or_expressions = [] for group in filters: if not isinstance(group, Iterable): - raise ValueError( - f"Filter group should be a iterable, {group} is not valid." - ) + group = [group] and_expressions = [] for filter_item in group: @@ -329,9 +333,9 @@ def _filters_to_query(self, query_or_table, columns, filters): if operator not in valid_operators: raise ValueError(f"Operator {operator} is not valid.") - operator = valid_operators[operator] + operator_str = valid_operators[operator] - if operator in ["IN", "NOT IN"]: + if operator_str in ["IN", "NOT IN"]: value_list = ", ".join([repr(v) for v in value]) expression = f"`{column}` {operator} ({value_list})" else: @@ -521,6 +525,7 @@ def read_gbq_table( index_col: Iterable[str] | str = (), columns: Iterable[str] = (), max_results: Optional[int] = None, + filters: third_party_pandas_gbq.FiltersType = (), use_cache: bool = True, col_order: Iterable[str] = (), ) -> dataframe.DataFrame: @@ -546,6 +551,19 @@ def read_gbq_table( elif col_order: columns = col_order + filters = list(filters) + if len(filters) != 0 or _is_table_with_wildcard_suffix(query): + query = self._to_query(query, columns, filters) + + return self._read_gbq_query( + query, + index_col=index_col, + columns=columns, + max_results=max_results, + api_name="read_gbq_table", + use_cache=use_cache, + ) + return self._read_gbq_table( query=query, index_col=index_col, diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 2d9c332de1..6b687de1ef 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -327,6 +327,32 @@ def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id): assert df3 is not None +def test_read_gbq_wildcard(session: bigframes.Session): + df = session.read_gbq("bigquery-public-data.noaa_gsod.gsod193*") + assert df.shape == (348485, 32) + + +def test_read_gbq_wildcard_with_filter(session: bigframes.Session): + df = session.read_gbq( + "bigquery-public-data.noaa_gsod.gsod19*", + filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], + ) + assert df.shape == (348485, 32) + + +def test_read_gbq_table_wildcard(session: bigframes.Session): + df = session.read_gbq_table("bigquery-public-data.noaa_gsod.gsod193*") + assert df.shape == (348485, 32) + + +def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session): + df = session.read_gbq_table( + "bigquery-public-data.noaa_gsod.gsod19*", + filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], + ) + assert df.shape == (348485, 32) + + def test_read_gbq_model(session, penguins_linear_model_name): model = session.read_gbq_model(penguins_linear_model_name) assert isinstance(model, bigframes.ml.linear_model.LinearRegression) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index ea8d0882ae..312e2eca80 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -156,5 +156,30 @@ def test_session_init_fails_with_no_project(): ) def test_read_gbq_with_filters(query_or_table, columns, filters, expected_output): session = resources.create_bigquery_session() - query = session._filters_to_query(query_or_table, columns, filters) + query = session._to_query(query_or_table, columns, filters) + assert query == expected_output + + +@pytest.mark.parametrize( + ("query_or_table", "columns", "filters", "expected_output"), + [ + pytest.param( + "test_table*", + [], + [], + "SELECT * FROM test_table* AS sub", + id="wildcard_table_input", + ), + pytest.param( + "test_table*", + [], + [("_TABLE_SUFFIX", ">", "2022-10-20")], + "SELECT * FROM test_table* AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'", + id="wildcard_table_input_with_filter", + ), + ], +) +def test_read_gbq_wildcard(query_or_table, columns, filters, expected_output): + session = resources.create_bigquery_session() + query = session._to_query(query_or_table, columns, filters) assert query == expected_output diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 8e2c9f092d..a5fdb43902 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -7,7 +7,8 @@ from bigframes import constants -FilterType = Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any] +FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"] +FilterType = Tuple[str, FilterOps, Any] FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]] @@ -52,6 +53,9 @@ def read_gbq( >>> df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") + Read table path with wildcard suffix and filters: + >>> df = bpd.read_gbq_table("bigquery-public-data.noaa_gsod.gsod19*", filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")]) + Preserve ordering in a query input. >>> df = bpd.read_gbq(''' @@ -96,6 +100,8 @@ def read_gbq( A SQL string to be executed or a BigQuery table to be read. The table must be specified in the format of `project.dataset.tablename` or `dataset.tablename`. + Can also take wildcard table name, such as `project.dataset.table_prefix*`. + In tha case, will read all the matched table as one DataFrame. index_col (Iterable[str] or str): Name of result column(s) to use for index in results DataFrame. columns (Iterable[str]): @@ -112,6 +118,9 @@ def read_gbq( through an OR operation. A single Iterable of tuples can also be used, meaning that no OR operation between set of filters is to be conducted. + If using wildcard table suffix in query_or_table, can specify + '_table_suffix' pseudo column to filter the tables to be read + into the DataFrame. use_cache (bool, default True): Whether to cache the query inputs. Default to True. col_order (Iterable[str]): From 6c23180997c7633f8c070550fdcb5096cb0e9bd1 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Mon, 12 Feb 2024 19:39:59 +0000 Subject: [PATCH 2/6] feat: support read_gbq wildcard table path --- bigframes/session/__init__.py | 6 ++++++ third_party/bigframes_vendored/pandas/io/gbq.py | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 396af39843..6c29a43e1d 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -311,6 +311,12 @@ def _to_query( "!=": "!=", } + # If single layer filter, add another pseudo layer. So the single layer represents "and" logic. + if isinstance(filters[0], tuple) and ( + len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple) + ): + filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters]) + or_expressions = [] for group in filters: if not isinstance(group, Iterable): diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index a5fdb43902..1f31c530d2 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -9,7 +9,7 @@ FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"] FilterType = Tuple[str, FilterOps, Any] -FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]] +FiltersType = Union[Iterable[FilterType], Iterable[Iterable[FilterType]]] class GBQIOMixin: @@ -110,7 +110,7 @@ def read_gbq( max_results (Optional[int], default None): If set, limit the maximum number of rows to fetch from the query results. - filters (Iterable[Union[Tuple, Iterable[Tuple]]], default ()): To + filters (Union[Iterable[FilterType], Iterable[Iterable[FilterType]]], default ()): To filter out data. Filter syntax: [[(column, op, val), …],…] where op is [==, >, >=, <, <=, !=, in, not in]. The innermost tuples are transposed into a set of filters applied through an AND From ad09a7bbe2b7a45f3a7b4e33bfd829498f9116f6 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Mon, 12 Feb 2024 20:37:34 +0000 Subject: [PATCH 3/6] fix tests --- tests/system/small/test_session.py | 4 ++-- tests/unit/session/test_session.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 6b687de1ef..85573472b9 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -335,7 +335,7 @@ def test_read_gbq_wildcard(session: bigframes.Session): def test_read_gbq_wildcard_with_filter(session: bigframes.Session): df = session.read_gbq( "bigquery-public-data.noaa_gsod.gsod19*", - filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], + filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore ) assert df.shape == (348485, 32) @@ -348,7 +348,7 @@ def test_read_gbq_table_wildcard(session: bigframes.Session): def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session): df = session.read_gbq_table( "bigquery-public-data.noaa_gsod.gsod19*", - filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], + filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore ) assert df.shape == (348485, 32) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 312e2eca80..8a22ee18a8 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -167,14 +167,14 @@ def test_read_gbq_with_filters(query_or_table, columns, filters, expected_output "test_table*", [], [], - "SELECT * FROM test_table* AS sub", + "SELECT * FROM `test_table*` AS sub", id="wildcard_table_input", ), pytest.param( "test_table*", [], [("_TABLE_SUFFIX", ">", "2022-10-20")], - "SELECT * FROM test_table* AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'", + "SELECT * FROM `test_table*` AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard_table_input_with_filter", ), ], From 2c04160ca6a48bf9cf9e805e34d00e64cba85dde Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Mon, 12 Feb 2024 20:45:43 +0000 Subject: [PATCH 4/6] fix tests --- bigframes/session/__init__.py | 4 ++-- tests/unit/session/test_session.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 6c29a43e1d..df0cd6e947 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -343,9 +343,9 @@ def _to_query( if operator_str in ["IN", "NOT IN"]: value_list = ", ".join([repr(v) for v in value]) - expression = f"`{column}` {operator} ({value_list})" + expression = f"`{column}` {operator_str} ({value_list})" else: - expression = f"`{column}` {operator} {repr(value)}" + expression = f"`{column}` {operator_str} {repr(value)}" and_expressions.append(expression) or_expressions.append(" AND ".join(and_expressions)) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 8a22ee18a8..b474c9f63e 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -125,7 +125,7 @@ def test_session_init_fails_with_no_project(): "test_table", [], [("date_col", ">", "2022-10-20")], - "SELECT * FROM test_table AS sub WHERE `date_col` > '2022-10-20'", + "SELECT * FROM `test_table` AS sub WHERE `date_col` > '2022-10-20'", id="table_input", ), pytest.param( @@ -136,7 +136,7 @@ def test_session_init_fails_with_no_project(): (("string_col", "in", ["Hello, World!", "こんにちは"]),), ], ( - "SELECT `row_index`, `string_col` FROM test_table AS sub WHERE " + "SELECT `row_index`, `string_col` FROM `test_table` AS sub WHERE " "`rowindex` NOT IN (0, 6) OR `string_col` IN ('Hello, World!', " "'こんにちは')" ), From 85b29da9b317cf671a20d340dbe288d3953a1943 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Mon, 12 Feb 2024 21:17:23 +0000 Subject: [PATCH 5/6] fix tests --- bigframes/pandas/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 554acda202..0b62c6f4cd 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -548,6 +548,7 @@ def read_gbq_table( index_col: Iterable[str] | str = (), columns: Iterable[str] = (), max_results: Optional[int] = None, + filters: vendored_pandas_gbq.FiltersType = (), use_cache: bool = True, col_order: Iterable[str] = (), ) -> bigframes.dataframe.DataFrame: From f82db8a1af29a179f4336dd993b1996a8fd95541 Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Mon, 12 Feb 2024 21:19:43 +0000 Subject: [PATCH 6/6] fix tests --- bigframes/pandas/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 0b62c6f4cd..56d07f640c 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -559,6 +559,7 @@ def read_gbq_table( index_col=index_col, columns=columns, max_results=max_results, + filters=filters, use_cache=use_cache, col_order=col_order, )