8000 dataflow/flex-templates: Add Pub/Sub to BigQuery sample (#3048) · larkee/python-docs-samples@97f13c7 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit 97f13c7

Browse files
dataflow/flex-templates: Add Pub/Sub to BigQuery sample (GoogleCloudPlatform#3048)
* Adding python streaming beam example for consuming messages from PubSub into BigQuery. * Adding the README, Dockerfile and other setup files. Also, refactored the python example code sample. * Addressing review comments. * Fixing a README typo. * Addressing review comments. * Addressing review comments and fixing nox lint errors. * Fixing README instruction. * Adding license to Dockerfile, removing template.json and minor modifications to README. * Addressing the review comments on README. * Updating the license header. * Adding the python test case for the sample example. * Addressing the comments on the test case. * Adding the window_interval flag and using it in the test case. * Addressing review comment. * Removing trailing whitespace. Co-authored-by: David Cavazos <dcavazos@google.com>
1 parent f9dbf42 commit 97f13c7

File tree

6 files changed

+592
-0
lines changed

6 files changed

+592
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
16+
17+
ARG WORKDIR=/dataflow/template
18+
RUN mkdir -p ${WORKDIR}
19+
WORKDIR ${WORKDIR}
20+
21+
COPY requirements.txt .
22+
COPY streaming_beam.py .
23+
24+
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
25+
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
26+
27+
RUN pip install -U -r ./requirements.txt
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
# Dataflow flex templates - Streaming Beam
2+
3+
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor)
4+
5+
📝 Docs: [Using Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates)
6+
7+
Samples showing how to create and run an
8+
[Apache Beam](https://beam.apache.org/) template with a custom Docker image on
9+
[Google Cloud Dataflow](https://cloud.google.com/dataflow/docs/).
10+
11+
## Before you begin
12+
13+
Follow the
14+
[Getting started with Google Cloud Dataflow](../README.md)
15+
page, and make sure you have a Google Cloud project with billing enabled
16+
and a *service account JSON key* set up in your `GOOGLE_APPLICATION_CREDENTIALS`
17+
environment variable.
18+
Additionally, for this sample you need the following:
19+
20+
1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=appengine.googleapis.com,cloudscheduler.googleapis.com,cloudbuild.googleapis.com):
21+
App Engine, Cloud Scheduler, Cloud Build.
22+
23+
1. Create a
24+
[Cloud Storage bucket](https://cloud.google.com/storage/docs/creating-buckets).
25+
26+
```sh
27+
export BUCKET="your-gcs-bucket"
28+
gsutil mb gs://$BUCKET
29+
```
30+
31+
1. Create a
32+
[Pub/Sub topic](https://cloud.google.com/pubsub/docs/admin#creating_a_topic)
33+
and a
34+
[subscription](https://cloud.google.com/pubsub/docs/admin#creating_subscriptions)
35+
to that topic.
36+
This is a streaming source of data for the sample.
37+
38+
```sh
39+
# For simplicity we use the same topic name as the subscription name.
40+
export TOPIC="messages"
41+
export SUBSCRIPTION="$TOPIC"
42+
43+
gcloud pubsub topics create $TOPIC
44+
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION
45+
```
46+
47+
1. Create a
48+
[Cloud Scheduler job](https://cloud.google.com/scheduler/docs/quickstart)
49+
to publish "positive" and "negative" ratings every
50+
[1 and 2 minutes](https://cloud.google.com/scheduler/docs/configuring/cron-job-schedules).
51+
This publishes messages to the Pub/Sub source topic.
52+
53+
```sh
54+
# Create a publisher for "positive ratings" that publishes 1 message per minute
55+
# If an App Engine app does not exist for the project, this step will create one.
56+
gcloud scheduler jobs create pubsub positive-ratings-publisher \
57+
--schedule="* * * * *" \
58+
--topic="$TOPIC" \
59+
--message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
60+
61+
# Start the job.
62+
gcloud scheduler jobs run positive-ratings-publisher
63+
64+
# Create and run another similar publisher for "negative ratings" that
65+
# publishes 1 message every 2 minutes.
66+
gcloud scheduler jobs create pubsub negative-ratings-publisher \
67+
--schedule="*/2 * * * *" \
68+
--topic="$TOPIC" \
69+
--message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
70+
71+
gcloud scheduler jobs run negative-ratings-publisher
72+
```
73+
74+
1. Create a [BigQuery dataset](https://cloud.google.com/bigquery/docs/datasets).
75+
This is a table to write the output data.
76+
77+
```sh
78+
export PROJECT="$(gcloud config get-value project)"
79+
export DATASET="beam_samples"
80+
export TABLE="streaming_beam"
81+
82+
bq mk --dataset "$PROJECT:$DATASET"
83+
```
84+
85+
1. Clone the
86+
[`python-docs-samples` repository](https://github.com/GoogleCloudPlatform/python-docs-samples)
87+
and navigate to the code sample.
88+
89+
```sh
90+
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
91+
cd python-docs-samples/dataflow/flex-templates/streaming_beam
92+
```
93+
94+
## Pub/Sub to BigQuery sample
95+
96+
This sample shows how to deploy an Apache Beam streaming pipeline that reads
97+
[JSON encoded](https://www.w3schools.com/whatis/whatis_json.asp) messages from
98+
[Pub/Sub](https://cloud.google.com/pubsub),
99+
transforms the message data, and writes the results to a
100+
[BigQuery](https://cloud.google.com/bigquery) table.
101+
102+
* [Dockerfile](Dockerfile)
103+
* [streaming_beam.py](streaming_beam)
104+
* [metadata.json](metadata.json)
105+
106+
### Building a container image
107+
108+
We will build the
109+
[Docker](https://docs.docker.com/engine/docker-overview/)
110+
image for the Apache Beam pipeline.
111+
We are using
112+
[Cloud Build](https://cloud.google.com/cloud-build)
113+
so we don't need a local installation of Docker.
114+
115+
> ℹ️ You can speed up subsequent builds with
116+
> [Kaniko cache](https://cloud.google.com/cloud-build/docs/kaniko-cache)
117+
> in Cloud Build.
118+
>
119+
> ```sh
120+
> # (Optional) Enable to use Kaniko cache by default.
121+
> gcloud config set builds/use_kaniko True
122+
> ```
123+
124+
Cloud Build allows you to
125+
[build a Docker image using a `Dockerfile`](https://cloud.google.com/cloud-build/docs/quickstart-docker#build_using_dockerfile).
126+
and saves it into
127+
[Container Registry](https://cloud.google.com/container-registry/),
128+
where the image is accessible to other Google Cloud products.
129+
130+
```sh
131+
export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam:latest"
132+
133+
# Build the image into Container Registry, this is roughly equivalent to:
134+
# gcloud auth configure-docker
135+
# docker image build -t $TEMPLATE_IMAGE .
136+
# docker push $TEMPLATE_IMAGE
137+
gcloud builds submit --tag "$TEMPLATE_IMAGE" .
138+
```
139+
140+
Images starting with `gcr.io/PROJECT/` are saved into your project's
141+
Container Registry, where the image is accessible to other Google Cloud products.
142+
143+
### Creating a Flex Template
144+
145+
To run a template, you need to create a *template spec* file containing all the
146+
necessary information to run the job, such as the SDK information and metadata.
147+
148+
The [`metadata.json`](metadata.json) file contains additional information for
149+
the template such as the "name", "description", and input "parameters" field.
150+
151+
The template file must be created in a Cloud Storage location,
152+
and is used to run a new Dataflow job.
153+
154+
```sh
155+
export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam.json"
156+
157+
# Build the Flex Template.
158+
gcloud beta dataflow flex-template build $TEMPLATE_PATH \
159+
--image "$TEMPLATE_IMAGE" \
160+
--sdk-language "PYTHON" \
161+
--metadata-file "metadata.json"
162+
```
163+
164+
The template is now available through the template file in the Cloud Storage
165+
location that you specified.
166+
167+
### Running a Dataflow Flex Template pipeline
168+
169+
You can now run the Apache Beam pipeline in Dataflow by referring to the
170+
template file and passing the template
171+
[parameters](https://cloud.devsite.corp.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options)
172+
required by the pipeline.
173+
174+
```sh
175+
# Run the Flex Template.
176+
gcloud beta dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
177+
--template-file-gcs-location "$TEMPLATE_PATH" \
178+
--parameters "input_subscription=$SUBSCRIPTION,output_table=$PROJECT:$DATASET.$TABLE"
179+
```
180+
181+
Check the results in BigQuery by running the following query:
182+
183+
```sh
184+
bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
185+
```
186+
187+
While this pipeline is running, you can see new rows appended into the BigQuery
188+
table every minute.
189+
190+
You can manually publish more messages from the
191+
[Cloud Scheduler page](https://console.cloud.google.com/cloudscheduler)
192+
to see how that affects the page review scores.
193+
194+
You can also publish messages directly to a topic through the
195+
[Pub/Sub topics page](https://console.cloud.google.com/cloudpubsub/topic/list)
196+
by selecting the topic you want to publish to,
197+
and then clicking the "Publish message" button at the top.
198+
This way you can test your pipeline with different URLs,
199+
just make sure you pass valid JSON data since this sample does not do any
200+
error handling for code simplicity.
201+
202+
Try sending the following message and check back the BigQuery table about
203+
a minute later.
204+
205+
```json
206+
{"url": "https://cloud.google.com/bigquery/", "review": "positive"}
207+
```
208+
209+
### Cleaning up
210+
211+
After you've finished this tutorial, you can clean up the resources you created
212+
on Google Cloud so you won't be billed for them in the future.
213+
The following sections describe how to delete or turn off these resources.
214+
215+
#### Clean up the Flex template resources
216+
217+
1. Stop the Dataflow pipeline.
218+
219+
```sh
220+
gcloud dataflow jobs list \
221+
--filter 'NAME:streaming-beam AND STATE=Running' \
222+
--format 'value(JOB_ID)' \
223+
| xargs gcloud dataflow jobs cancel
224+
```
225+
226+
1. Delete the template spec file from Cloud Storage.
227+
228+
```sh
229+
gsutil rm $TEMPLATE_PATH
230+
```
231+
232+
1. Delete the Flex Template container image from Container Registry.
233+
234+
```sh
235+
gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
236+
```
237+
238+
#### Clean up Google Cloud project resources
239+
240+
1. Delete the Cloud Scheduler jobs.
241+
242+
```sh
243+
gcloud scheduler jobs delete negative-ratings-publisher
244+
gcloud scheduler jobs delete positive-ratings-publisher
245+
```
246+
247+
1. Delete the Pub/Sub subscription and topic.
248+
249+
```sh
250+
gcloud pubsub subscriptions delete $SUBSCRIPTION
251+
gcloud pubsub topics delete $TOPIC
252+
```
253+
254+
1. Delete the BigQuery table.
255+
256+
```sh
257+
bq rm -f -t $PROJECT:$DATASET.$TABLE
258+
```
259+
260+
1. Delete the BigQuery dataset, this alone does not incur any charges.
261+
262+
> ⚠️ The following command also deletes all tables in the dataset.
263+
> The tables and data cannot be recovered.
264+
>
265+
> ```sh
266+
> bq rm -r -f -d $PROJECT:$DATASET
267+
> ```
268+
269+
1. Delete the Cloud Storage bucket, this alone does not incur any charges.
270+
271+
> ⚠️ The following command also deletes all objects in the bucket.
272+
> These objects cannot be recovered.
273+
>
274+
> ```sh
275+
> gsutil rm -r gs://$BUCKET
276+
> ```
277+
278+
## Limitations
279+
280+
* You must use a Google-provided base image to package your containers using Docker.
281+
* You cannot update streaming jobs using Flex Template.
282+
* You cannot use FlexRS for Flex Template jobs.
283+
284+
📝 Docs: [Using Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"name": "Streaming beam Python flex template",
3+
"description": "Streaming beam example for python flex template.",
4+
"parameters": [
5+
{
6+
"name": "input_subscription",
7+
"label": "Input PubSub subscription.",
8+
"helpText": "Name of the input PubSub subscription to consume from.",
9+
"regexes": [
10+
"[/:-_.a-zA-Z0-9]+"
11+
]
12+
},
13+
{
14+
"name": "output_table",
15+
"label": "BigQuery output table name.",
16+
"helpText": "Name of the BigQuery output table name.",
17+
"is_optional": true,
18+
"regexes": [
19+
"[/:-_.a-zA-Z0-9]+"
20+
]
21+
}
22+
]
23+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
apache-beam[gcp]==2.19.0

0 commit comments

Comments
 (0)
0