8000 BigQuery Storage API sample for reading pandas dataframe (#1994) · devlance/python-docs-samples@e9bc7de · GitHub
[go: up one dir, main page]

Skip to content

Commit e9bc7de

Browse files
authored
BigQuery Storage API sample for reading pandas dataframe (GoogleCloudPlatform#1994)
* BigQuery Storage API sample for reading pandas dataframe How to get a pandas DataFrame, fast! The first two examples use the existing BigQuery client. These examples create a thread pool and read in parallel. The final example shows using just the new BigQuery Storage client, but only shows how to read with a single thread.
1 parent 5456742 commit e9bc7de

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

bigquery_storage/to_dataframe/__init__.py

Whitespace-only changes.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import uuid
16+
17+
import pytest
18+
19+
20+
@pytest.fixture
21+
def clients():
22+
# [START bigquerystorage_pandas_tutorial_all]
23+
# [START bigquerystorage_pandas_tutorial_create_client]
24+
import google.auth
25+
from google.cloud import bigquery
26+
from google.cloud import bigquery_storage_v1beta1
27+
28+
# Explicitly create a credentials object. This allows you to use the same
29+
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
30+
# unnecessary API calls to fetch duplicate authentication tokens.
31+
credentials, your_project_id = google.auth.default(
32+
scopes=["https://www.googleapis.com/auth/cloud-platform"]
33+
)
34+
35+
# Make clients.
36+
bqclient = bigquery.Client(
37+
credentials=credentials,
38+
project=your_project_id
39+
)
40+
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
41+
credentials=credentials
42+
)
43+
# [END bigquerystorage_pandas_tutorial_create_client]
44+
# [END bigquerystorage_pandas_tutorial_all]
45+
return bqclient, bqstorageclient
46+
47+
48+
def test_table_to_dataframe(capsys, clients):
49+
from google.cloud import bigquery
50+
51+
bqclient, bqstorageclient = clients
52+
53+
# [START bigquerystorage_pandas_tutorial_all]
54+
# [START bigquerystorage_pandas_tutorial_read_table]
55+
# Download a table.
56+
table = bigquery.TableReference.from_string(
57+
"bigquery-public-data.utility_us.country_code_iso"
58+
)
59+
rows = bqclient.list_rows(
60+
table,
61+
selected_fields=[
62+
bigquery.SchemaField("country_name", "STRING"),
63+
bigquery.SchemaField("fips_code", "STRING"),
64+
],
65+
)
66+
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
67+
print(dataframe.head())
68+
# [END bigquerystorage_pandas_tutorial_read_table]
69+
# [END bigquerystorage_pandas_tutorial_all]
70+
71+
out, _ = capsys.readouterr()
72+
assert "country_name" in out
73+
74+
75+
@pytest.fixture
76+
def temporary_dataset(clients):
77+
from google.cloud import bigquery
78+
79+
bqclient, _ = clients
80+
81+
# [START bigquerystorage_pandas_tutorial_all]
82+
# [START bigquerystorage_pandas_tutorial_create_dataset]
83+
# Set the dataset_id to the dataset used to store temporary results.
84+
dataset_id = "query_results_dataset"
85+
# [END bigquerystorage_pandas_tutorial_create_dataset]
86+
# [END bigquerystorage_pandas_tutorial_all]
87+
88+
dataset_id = "bqstorage_to_dataset_{}".format(uuid.uuid4().hex)
89+
90+
# [START bigquerystorage_pandas_tutorial_all]
91+
# [START bigquerystorage_pandas_tutorial_create_dataset]
92+
dataset_ref = bqclient.dataset(dataset_id)
93+
dataset = bigquery.Dataset(dataset_ref)
94+
95+
# Remove tables after 24 hours.
96+
dataset.default_table_expiration_ms = 1000 * 60 * 60 * 24
97+
98+
bqclient.create_dataset(dataset) # API request.
99+
# [END bigquerystorage_pandas_tutorial_create_dataset]
100+
# [END bigquerystorage_pandas_tutorial_all]
101+
yield dataset_ref
102+
# [START bigquerystorage_pandas_tutorial_cleanup]
103+
bqclient.delete_dataset(dataset_ref, delete_contents=True)
104+
# [END bigquerystorage_pandas_tutorial_cleanup]
105+
106+
107+
def test_query_to_dataframe(capsys, clients, temporary_dataset):
108+
from google.cloud import bigquery
109+
110+
bqclient, bqstorageclient = clients
111+
dataset_ref = temporary_dataset
112+
113+
# [START bigquerystorage_pandas_tutorial_all]
114+
# [START bigquerystorage_pandas_tutorial_read_query_results]
115+
import uuid
116+
117+
# Download query results.
118+
query_string = """
119+
SELECT
120+
CONCAT(
121+
'https://stackoverflow.com/questions/',
122+
CAST(id as STRING)) as url,
123+
view_count
124+
FROM `bigquery-public-data.stackoverflow.posts_questions`
125+
WHERE tags like '%google-bigquery%'
126+
ORDER BY view_count DESC
127+
"""
128+
# Use a random table name to avoid overwriting existing tables.
129+
table_id = "queryresults_" + uuid.uuid4().hex
130+
table = dataset_ref.table(table_id)
131+
query_config = bigquery.QueryJobConfig(
132+
# Due to a known issue in the BigQuery Storage API, small query result
133+
# sets cannot be downloaded. To workaround this issue, write results to
134+
# a destination table.
135+
destination=table
136+
)
137+
138+
dataframe = (
139+
bqclient.query(query_string, job_config=query_config)
140+
.result()
141+
.to_dataframe(bqstorage_client=bqstorageclient)
142+
)
143+
print(dataframe.head())
144+
# [END bigquerystorage_pandas_tutorial_read_query_results]
145+
# [END bigquerystorage_pandas_tutorial_all]
146+
147+
out, _ = capsys.readouterr()
148+
assert "stackoverflow" in out
149+
150+
151+
def test_session_to_dataframe(capsys, clients):
152+
from google.cloud import bigquery_storage_v1beta1
153+
154+
bqclient, bqstorageclient = clients
155+
your_project_id = bqclient.project
156+
157+
# [START bigquerystorage_pandas_tutorial_all]
158+
# [START bigquerystorage_pandas_tutorial_read_session]
159+
table = bigquery_storage_v1beta1.types.TableReference()
160+
table.project_id = "bigquery-public-data"
161+
table.dataset_id = "new_york_trees"
162+
table.table_id = "tree_species"
163+
164+
# Select columns to read with read options. If no read options are
165+
# specified, the whole table is read.
166+
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
167+
read_options.selected_fields.append("species_common_name")
168+
read_options.selected_fields.append("fall_color")
169+
170+
parent = "projects/{}".format(your_project_id)
171+
session = bqstorageclient.create_read_session(
172+
table, parent, read_options=read_options
173+
)
174+
175+
# This example reads from only a single stream. Read from multiple streams
176+
# to fetch data faster. Note that the session may not contain any streams
177+
# if there are no rows to read.
178+
stream = session.streams[0]
179+
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
180+
reader = bqstorageclient.read_rows(position)
181+
182+
# Parse all Avro blocks and create a dataframe. This call requires a
183+
# session, because the session contains the schema for the row blocks.
184+
dataframe = reader.to_dataframe(session)
185+
print(dataframe.head())
186+
# [END bigquerystorage_pandas_tutorial_read_session]
187+
# [END bigquerystorage_pandas_tutorial_all]
188+
189+
out, _ = capsys.readouterr()
190+
assert "species_common_name" in out
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
google-auth==1.6.2
2+
google-cloud-bigquery-storage==0.2.0
3+
google-cloud-bigquery==1.8.1
4+
fastavro==0.21.17
5+
pandas==0.24.0

0 commit comments

Comments
 (0)
0