From e23cb8f3080541306ea11c78384f51f880b26d9e Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 17 May 2024 19:16:07 +0000 Subject: [PATCH 01/14] feat: bigframes.streaming module for continuous queries --- bigframes/streaming/__init__.py | 133 +++++++++++++++++++++++++++ tests/system/large/test_streaming.py | 43 +++++++++ 2 files changed, 176 insertions(+) create mode 100644 bigframes/streaming/__init__.py create mode 100644 tests/system/large/test_streaming.py diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py new file mode 100644 index 0000000000..9c869d653f --- /dev/null +++ b/bigframes/streaming/__init__.py @@ -0,0 +1,133 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for bigquery continuous queries""" + +from typing import Optional + +from google.cloud import bigquery + +import bigframes + + +def write_stream_bigtable( + sql: str, + instance: str, + table: str, + bq_client: bigquery.Client = None, + app_profile: Optional[str] = None, + truncate: bool = False, + overwrite: bool = False, + auto_create_column_families: bool = False, + bigtable_options: Optional[str] = None, +) -> bigquery.QueryJob: + """Launches a BigQuery continuous query and returns a + QueryJob object for some management functionality. + + This method requires an existing bigtable preconfigured to + accept the continuous query export statement. For instructions + on export to bigtable, see + https://cloud.google.com/bigquery/docs/export-to-bigtable. + + Args: + sql (str): + The sql statement to execute as a continuous function. + For example: "SELECT * FROM dataset.table" + This will be wrapped in an EXPORT DATA statement to + launch a continuous query writing to bigtable. + instance (str): + The name of the bigtable instance to export to. + table (str): + The name of the bigtable table to export to. + bq_client (str, default None): + The Client object to use for the query. This determines + the project id and location of the query. If None, will + default to the bigframes global session default client. + app_profile (str, default None): + The bigtable app profile to export to. If None, no app + profile will be used. + truncate (bool, default False): + The export truncate option, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + overwrite (bool, default False): + The export overwrite option, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + auto_create_column_families (bool, default False): + The auto_create_column_families option, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + bigtable_options (str, default None): + The bigtable options JSON string, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + If None, no bigtable_options parameter will be passed. + + Returns: + google.cloud.bigquery.QueryJob: + See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob + The ongoing query job can be managed using this object. + For example, the job can be cancelled or its error status + can be examined. + """ + # get default client if not passed + if bq_client is None: + bq_client = bigframes.get_global_session().bqclient + + # build export string from parameters + app_profile_url_string = "" + if app_profile is not None: + app_profile_url_string = f"appProfiles/{app_profile}/" + + truncate_string = "FALSE" + if truncate: + truncate_string = "TRUE" + + overwrite_string = "FALSE" + if overwrite: + overwrite_string = "TRUE" + + auto_create_column_families_string = "FALSE" + if auto_create_column_families: + auto_create_column_families_string = "TRUE" + + project = bq_client.project + + bigtable_options_parameter_string = "" + if bigtable_options is not None: + bigtable_options_parameter_string = ( + 'bigtable_options = """' + bigtable_options + '""",\n' + ) + + sql = ( + "EXPORT DATA\n" + "OPTIONS (\n" + "format = 'CLOUD_BIGTABLE',\n" + f"{bigtable_options_parameter_string}" + f"truncate = {truncate_string},\n" + f"overwrite = {overwrite_string},\n" + f"auto_create_column_families = {auto_create_column_families_string},\n" + f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' + ")\n" + "AS (\n" + f"{sql});" + ) + + # override continuous http parameter + job_config = bigquery.job.QueryJobConfig() + job_config = job_config.from_api_repr({"query": {"continuous": True}}) + job_config.to_api_repr() + + # begin the query job + query_job = bq_client.query(sql, job_config=job_config) + + # return the query job to the user for lifetime management + return query_job diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py new file mode 100644 index 0000000000..168e474ca6 --- /dev/null +++ b/tests/system/large/test_streaming.py @@ -0,0 +1,43 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import bigframes.streaming + + +def test_streaming(): + # launch a continuous query + sql = """SELECT + body_mass_g, island as rowkey + FROM birds.penguins""" + query_job = bigframes.streaming.write_stream_bigtable( + sql, + "streaming-testing-instance", + "table-testing", + app_profile="test-profile", + truncate=True, + overwrite=True, + auto_create_column_families=True, + bigtable_options="{}", + ) + + try: + # wait 100 seconds in order to ensure the query doesn't stop + # (i.e. it is continuous) + time.sleep(100) + assert query_job.errors is None + assert not query_job.done() + finally: + query_job.cancel() From a15de6ceeaa4648e774bf11e7f51e80aed89f023 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 20 May 2024 15:19:01 +0000 Subject: [PATCH 02/14] mypy fix --- bigframes/streaming/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 9c869d653f..9330726f5e 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -25,7 +25,7 @@ def write_stream_bigtable( sql: str, instance: str, table: str, - bq_client: bigquery.Client = None, + bq_client: Optional[bigquery.Client] = None, app_profile: Optional[str] = None, truncate: bool = False, overwrite: bool = False, From b2c9ef7d639c4648a5e2f800477f627693f12618 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 20 May 2024 15:55:13 +0000 Subject: [PATCH 03/14] mypy fix 2 --- bigframes/streaming/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 9330726f5e..d1de5d1926 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -123,11 +123,10 @@ def write_stream_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config = job_config.from_api_repr({"query": {"continuous": True}}) - job_config.to_api_repr() + job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) # begin the query job - query_job = bq_client.query(sql, job_config=job_config) + query_job = bq_client.query(sql, job_config=job_config_filled) # return the query job to the user for lifetime management return query_job From 8ae237882fcc6e3427dae03ec9f28a531650ef9f Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 20 May 2024 16:09:58 +0000 Subject: [PATCH 04/14] mypy fix 3 --- bigframes/streaming/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index d1de5d1926..e523e074ac 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -123,7 +123,9 @@ def write_stream_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + job_config_filled: bigquery.job.QueryJobConfig = job_config.from_api_repr( + {"query": {"continuous": True}} + ) # begin the query job query_job = bq_client.query(sql, job_config=job_config_filled) From 3286ae6b35208001e7f5235de2065eb1c42aae87 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 20 May 2024 16:24:05 +0000 Subject: [PATCH 05/14] ignore mypy, error is in bq library --- bigframes/streaming/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index e523e074ac..8b28af7f26 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -123,12 +123,11 @@ def write_stream_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled: bigquery.job.QueryJobConfig = job_config.from_api_repr( - {"query": {"continuous": True}} - ) + job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) # begin the query job - query_job = bq_client.query(sql, job_config=job_config_filled) + query_job = bq_client.query(sql, job_config=job_config_filled) # type:ignore + # typing error is in bq client library (should accept abstract, only takes concrete) # return the query job to the user for lifetime management return query_job From 64407585bdf39f46f03c2aecf0819b08bef0d079 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 18:55:33 +0000 Subject: [PATCH 06/14] address comments from meeting --- bigframes/streaming/__init__.py | 34 ++++++++++++++++++++-------- tests/system/large/test_streaming.py | 8 ++++--- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 8b28af7f26..636638f15f 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -14,6 +14,7 @@ """Module for bigquery continuous queries""" +import json from typing import Optional from google.cloud import bigquery @@ -21,8 +22,8 @@ import bigframes -def write_stream_bigtable( - sql: str, +def to_bigtable( + query: str, instance: str, table: str, bq_client: Optional[bigquery.Client] = None, @@ -30,7 +31,9 @@ def write_stream_bigtable( truncate: bool = False, overwrite: bool = False, auto_create_column_families: bool = False, - bigtable_options: Optional[str] = None, + bigtable_options: Optional[dict] = None, + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, ) -> bigquery.QueryJob: """Launches a BigQuery continuous query and returns a QueryJob object for some management functionality. @@ -41,7 +44,7 @@ def write_stream_bigtable( https://cloud.google.com/bigquery/docs/export-to-bigtable. Args: - sql (str): + query (str): The sql statement to execute as a continuous function. For example: "SELECT * FROM dataset.table" This will be wrapped in an EXPORT DATA statement to @@ -66,10 +69,19 @@ def write_stream_bigtable( auto_create_column_families (bool, default False): The auto_create_column_families option, see https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option - bigtable_options (str, default None): - The bigtable options JSON string, see + bigtable_options (dict, default None): + The bigtable options dict, which will be converted to JSON + using json.dumps, see https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option If None, no bigtable_options parameter will be passed. + job_id (str, default None): + If specified, replace the default job id for the query, + see job_id parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + job_id_prefix (str, default None): + If specified, a job id prefix for the query, see + job_id_prefix parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query Returns: google.cloud.bigquery.QueryJob: @@ -104,7 +116,7 @@ def write_stream_bigtable( bigtable_options_parameter_string = "" if bigtable_options is not None: bigtable_options_parameter_string = ( - 'bigtable_options = """' + bigtable_options + '""",\n' + 'bigtable_options = """' + json.dumps(bigtable_options) + '""",\n' ) sql = ( @@ -118,7 +130,7 @@ def write_stream_bigtable( f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' ")\n" "AS (\n" - f"{sql});" + f"{query});" ) # override continuous http parameter @@ -126,8 +138,10 @@ def write_stream_bigtable( job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) # begin the query job - query_job = bq_client.query(sql, job_config=job_config_filled) # type:ignore - # typing error is in bq client library (should accept abstract, only takes concrete) + query_job = bq_client.query( + sql, job_config=job_config_filled, job_id=job_id, job_id_prefix=job_id_prefix + ) # type:ignore + # typing error is in bq client library (should accept abstract job_config, only takes concrete) # return the query job to the user for lifetime management return query_job diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 168e474ca6..0aab721f6b 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -17,12 +17,12 @@ import bigframes.streaming -def test_streaming(): +def test_streaming_to_bigtable(): # launch a continuous query sql = """SELECT body_mass_g, island as rowkey FROM birds.penguins""" - query_job = bigframes.streaming.write_stream_bigtable( + query_job = bigframes.streaming.to_bigtable( sql, "streaming-testing-instance", "table-testing", @@ -30,7 +30,9 @@ def test_streaming(): truncate=True, overwrite=True, auto_create_column_families=True, - bigtable_options="{}", + bigtable_options={}, + job_id="test_streaming", + job_id_prefix="large_test", ) try: From 57163303d7061cbeddfa34f3f3821430bdcfb4a1 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 19:23:20 +0000 Subject: [PATCH 07/14] fix mypy --- bigframes/streaming/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 636638f15f..6944a0f69b 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -139,8 +139,11 @@ def to_bigtable( # begin the query job query_job = bq_client.query( - sql, job_config=job_config_filled, job_id=job_id, job_id_prefix=job_id_prefix - ) # type:ignore + sql, + job_config=job_config_filled, # type:ignore + job_id=job_id, + job_id_prefix=job_id_prefix, + ) # typing error is in bq client library (should accept abstract job_config, only takes concrete) # return the query job to the user for lifetime management From 9bd499b7dd7f6431e40920b37c8ae85cf7d55063 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 31 May 2024 17:39:42 +0000 Subject: [PATCH 08/14] don't use app profile --- tests/system/large/test_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 0aab721f6b..907f5230aa 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -26,7 +26,7 @@ def test_streaming_to_bigtable(): sql, "streaming-testing-instance", "table-testing", - app_profile="test-profile", + app_profile=None, truncate=True, overwrite=True, auto_create_column_families=True, From 53ede9503095153c1d00ba141d5c6a3b39cb2842 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 13 Jun 2024 16:44:40 +0000 Subject: [PATCH 09/14] address comments --- bigframes/streaming/__init__.py | 14 +++----------- tests/system/large/test_streaming.py | 2 +- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 6944a0f69b..211b208bb2 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -99,14 +99,6 @@ def to_bigtable( if app_profile is not None: app_profile_url_string = f"appProfiles/{app_profile}/" - truncate_string = "FALSE" - if truncate: - truncate_string = "TRUE" - - overwrite_string = "FALSE" - if overwrite: - overwrite_string = "TRUE" - auto_create_column_families_string = "FALSE" if auto_create_column_families: auto_create_column_families_string = "TRUE" @@ -124,8 +116,8 @@ def to_bigtable( "OPTIONS (\n" "format = 'CLOUD_BIGTABLE',\n" f"{bigtable_options_parameter_string}" - f"truncate = {truncate_string},\n" - f"overwrite = {overwrite_string},\n" + f"truncate = {str(truncate)},\n" + f"overwrite = {str(overwrite)},\n" f"auto_create_column_families = {auto_create_column_families_string},\n" f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' ")\n" diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 907f5230aa..03ea448bc9 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From bb4ed6423997af2dd260b32405e6847b51e1e65c Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 13 Jun 2024 17:12:04 +0000 Subject: [PATCH 10/14] check job_id --- tests/system/large/test_streaming.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 03ea448bc9..e40e260df8 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -19,6 +19,7 @@ def test_streaming_to_bigtable(): # launch a continuous query + job_id = "test_streaming" sql = """SELECT body_mass_g, island as rowkey FROM birds.penguins""" @@ -31,8 +32,8 @@ def test_streaming_to_bigtable(): overwrite=True, auto_create_column_families=True, bigtable_options={}, - job_id="test_streaming", - job_id_prefix="large_test", + job_id=job_id, + job_id_prefix="ignored_due_to_job_id", ) try: @@ -41,5 +42,6 @@ def test_streaming_to_bigtable(): time.sleep(100) assert query_job.errors is None assert not query_job.done() + assert query_job.job_id == job_id finally: query_job.cancel() From 6d9b0d9832de139410c7f087740f60189e5c01b5 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 17 Jun 2024 20:26:47 +0000 Subject: [PATCH 11/14] add bigtable setup script --- scripts/create_bigtable.py | 76 +++++++++++++++++++++++++++++++++++++ setup.py | 1 + testing/constraints-3.9.txt | 1 + 3 files changed, 78 insertions(+) create mode 100644 scripts/create_bigtable.py diff --git a/scripts/create_bigtable.py b/scripts/create_bigtable.py new file mode 100644 index 0000000000..655e4b31ab --- /dev/null +++ b/scripts/create_bigtable.py @@ -0,0 +1,76 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script create the bigtable resources required for +# bigframes.streaming testing if they don't already exist + +import os +import pathlib +import sys + +import google.cloud.bigtable as bigtable + +REPO_ROOT = pathlib.Path(__file__).parent.parent + +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") + +if not PROJECT_ID: + print( + "Please set GOOGLE_CLOUD_PROJECT environment variable before running.", + file=sys.stderr, + ) + sys.exit(1) + + +def create_instance(client): + instance_name = "streaming-testing-instance" + instance = bigtable.instance.Instance( + instance_name, + client, + ) + cluster_id = "streaming-testing-instance-c1" + cluster = instance.cluster( + cluster_id, + location_id="us-west1-a", + serve_nodes=1, + ) + if not instance.exists(): + operation = instance.create( + clusters=[cluster], + ) + operation.result(timeout=480) + print(f"Created instance {instance_name}") + return instance + + +def create_table(instance): + table_id = "table-testing" + table = bigtable.table.Table( + table_id, + instance, + ) + if not table.exists(): + table.create() + print(f"Created table {table_id}") + + +def main(): + client = bigtable.Client(project=PROJECT_ID, admin=True) + + instance = create_instance(client) + create_table(instance) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index d5d282d11a..dbd9ce5fc2 100644 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ "gcsfs >=2023.3.0", "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", + "google-cloud-bigtable >=2.24.0", "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 3c51668655..bbd7bf0069 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -4,6 +4,7 @@ fsspec==2023.3.0 gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 +google-cloud-bigtable==2.24.0 google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 From 635d938d7196c7a65e4d40a1ff590d65622d2709 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 17 Jun 2024 20:56:38 +0000 Subject: [PATCH 12/14] further simplify string --- bigframes/streaming/__init__.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 211b208bb2..13e4d57169 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -95,22 +95,12 @@ def to_bigtable( bq_client = bigframes.get_global_session().bqclient # build export string from parameters - app_profile_url_string = "" - if app_profile is not None: - app_profile_url_string = f"appProfiles/{app_profile}/" - - auto_create_column_families_string = "FALSE" - if auto_create_column_families: - auto_create_column_families_string = "TRUE" - project = bq_client.project - bigtable_options_parameter_string = "" if bigtable_options is not None: bigtable_options_parameter_string = ( 'bigtable_options = """' + json.dumps(bigtable_options) + '""",\n' ) - sql = ( "EXPORT DATA\n" "OPTIONS (\n" @@ -118,8 +108,8 @@ def to_bigtable( f"{bigtable_options_parameter_string}" f"truncate = {str(truncate)},\n" f"overwrite = {str(overwrite)},\n" - f"auto_create_column_families = {auto_create_column_families_string},\n" - f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' + f"auto_create_column_families = {auto_create_column_families},\n" + f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/appProfiles/{app_profile}/tables/{table}"\n' ")\n" "AS (\n" f"{query});" @@ -133,10 +123,10 @@ def to_bigtable( query_job = bq_client.query( sql, job_config=job_config_filled, # type:ignore + # typing error is in bq client library (should accept abstract job_config, only takes concrete) job_id=job_id, job_id_prefix=job_id_prefix, ) - # typing error is in bq client library (should accept abstract job_config, only takes concrete) # return the query job to the user for lifetime management return query_job From 8c50ebb5b2af0494e250fa372fe6a35d248ed1b1 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Mon, 17 Jun 2024 22:34:19 +0000 Subject: [PATCH 13/14] fix bugs --- bigframes/streaming/__init__.py | 11 +++++++++-- tests/system/large/test_streaming.py | 11 ++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 13e4d57169..2f25b004b3 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -96,11 +96,17 @@ def to_bigtable( # build export string from parameters project = bq_client.project + + app_profile_url_string = "" + if app_profile is not None: + app_profile_url_string = f"appProfiles/{app_profile}/" + bigtable_options_parameter_string = "" if bigtable_options is not None: bigtable_options_parameter_string = ( 'bigtable_options = """' + json.dumps(bigtable_options) + '""",\n' ) + sql = ( "EXPORT DATA\n" "OPTIONS (\n" @@ -109,7 +115,7 @@ def to_bigtable( f"truncate = {str(truncate)},\n" f"overwrite = {str(overwrite)},\n" f"auto_create_column_families = {auto_create_column_families},\n" - f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/appProfiles/{app_profile}/tables/{table}"\n' + f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' ")\n" "AS (\n" f"{query});" @@ -123,7 +129,8 @@ def to_bigtable( query_job = bq_client.query( sql, job_config=job_config_filled, # type:ignore - # typing error is in bq client library (should accept abstract job_config, only takes concrete) + # typing error above is in bq client library + # (should accept abstract job_config, only takes concrete) job_id=job_id, job_id_prefix=job_id_prefix, ) diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index e40e260df8..48db61e5bf 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -19,7 +19,7 @@ def test_streaming_to_bigtable(): # launch a continuous query - job_id = "test_streaming" + job_id_prefix = "test_streaming_" sql = """SELECT body_mass_g, island as rowkey FROM birds.penguins""" @@ -32,16 +32,17 @@ def test_streaming_to_bigtable(): overwrite=True, auto_create_column_families=True, bigtable_options={}, - job_id=job_id, - job_id_prefix="ignored_due_to_job_id", + job_id=None, + job_id_prefix=job_id_prefix, ) try: # wait 100 seconds in order to ensure the query doesn't stop # (i.e. it is continuous) time.sleep(100) + assert query_job.error_result is None assert query_job.errors is None - assert not query_job.done() - assert query_job.job_id == job_id + assert query_job.running() + assert str(query_job.job_id).startswith(job_id_prefix) finally: query_job.cancel() From 15f42edd820f28f44ae18a794670fddd3860a8ea Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 18 Jun 2024 17:42:57 +0000 Subject: [PATCH 14/14] add str() for consistent clarification --- bigframes/streaming/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 2f25b004b3..16da677ef5 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -114,7 +114,7 @@ def to_bigtable( f"{bigtable_options_parameter_string}" f"truncate = {str(truncate)},\n" f"overwrite = {str(overwrite)},\n" - f"auto_create_column_families = {auto_create_column_families},\n" + f"auto_create_column_families = {str(auto_create_column_families)},\n" f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' ")\n" "AS (\n"