8000 Add Dataflow script to run templates (#2175) · dhasegan/python-docs-samples@a9f269b · GitHub
[go: up one dir, main page]

Skip to content

Commit a9f269b

Browse files
davidcavazosandrewsg
authored andcommitted
Add Dataflow script to run templates (GoogleCloudPlatform#2175)
1 parent b8298b3 commit a9f269b

File tree

4 files changed

+341
-0
lines changed

4 files changed

+341
-0
lines changed

dataflow/run_template/README.md

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Run template
2+
3+
[`main.py`](main.py) - Script to run an [Apache Beam] template on [Google Cloud Dataflow].
4+
5+
The following examples show how to run the [`Word_Count` template], but you can run any other template.
6+
7+
For the `Word_Count` template, we require to pass an `output` Cloud Storage path prefix, and optionally we can pass an `inputFile` Cloud Storage file pattern for the inputs.
8+
If `inputFile` is not passed, it will take `gs://apache-beam-samples/shakespeare/kinglear.txt` as default.
9+
10+
## Before you begin
11+
12+
1. Install the [Cloud SDK].
13+
14+
1. [Create a new project].
15+
16+
1. [Enable billing].
17+
18+
1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,datastore.googleapis.com,cloudfunctions.googleapis.com,cloudresourcemanager.googleapis.com): Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Datastore, Cloud Functions, and Cloud Resource Manager.
19+
20+
1. Setup the Cloud SDK to your GCP project.
21+
22+
```bash
23+
gcloud init
24+
```
25+
26+
1. Create a Cloud Storage bucket.
27+
28+
```bash
29+
gsutil mb gs://your-gcs-bucket
30+
```
31+
32+
## Setup
33+
34+
The following instructions will help you prepare your development environment.
35+
36+
1. [Install Python and virtualenv].
37+
38+
1. Clone the `python-docs-samples` repository.
39+
40+
```bash
41+
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
42+
```
43+
44+
1. Navigate to the sample code directory.
45+
46+
```bash
47+
cd python-docs-samples/dataflow/run_template
48+
```
49+
50+
1. Create a virtual environment and activate it.
51+
52+
```bash
53+
virtualenv env
54+
source env/bin/activate
55+
```
56+
57+
> Once you are done, you can deactivate the virtualenv and go back to your global Python environment by running `deactivate`.
58+
59+
1. Install the sample requirements.
60+
61+
```bash
62+
pip install -U -r requirements.txt
63+
```
64+
65+
## Running locally
66+
67+
To run a Dataflow template from the command line.
68+
69+
> NOTE: To run locally, you'll need to [create a service account key] as a JSON file.
70+
> Then export an environment variable called `GOOGLE_APPLICATION_CREDENTIALS` pointing it to your service account file.
71+
72+
```bash
73+
python main.py \
74+
--project <your-gcp-project> \
75+
--job wordcount-$(date +'%Y%m%d-%H%M%S') \
76+
--template gs://dataflow-templates/latest/Word_Count \
77+
--inputFile gs://apache-beam-samples/shakespeare/kinglear.txt \
78+
--output gs://<your-gcs-bucket>/wordcount/outputs
79+
```
80+
81+
## Running in Python
82+
83+
To run a Dataflow template from Python.
84+
85+
> NOTE: To run locally, you'll need to [create a service account key] as a JSON file.
86+
> Then export an environment variable called `GOOGLE_APPLICATION_CREDENTIALS` pointing it to your service account file.
87+
88+
```py
89+
import main as run_template
90+
91+
run_template.run(
92+
project='your-gcp-project',
93+
job='unique-job-name',
94+
template='gs://dataflow-templates/latest/Word_Count',
95+
parameters={
96+
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
97+
'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
98+
}
99+
)
100+
```
101+
102+
## Running in Cloud Functions
103+
104+
To deploy this into a Cloud Function and run a Dataflow template via an HTTP request as a REST API.
105+
106+
```bash
107+
PROJECT=$(gcloud config get-value project) \
108+
REGION=$(gcloud config get-value functions/region)
109+
110+
# Deploy the Cloud Function.
111+
gcloud functions deploy run_template \
112+
--runtime python37 \
113+
--trigger-http \
114+
--region $REGION
115+
116+
# Call the Cloud Function via an HTTP request.
117+
curl -X POST "https://$REGION-$PROJECT.cloudfunctions.net/run_template" \
118+
-d project=$PROJECT \
119+
-d job=wordcount-$(date +'%Y%m%d-%H%M%S') \
120+
-d template=gs://dataflow-templates/latest/Word_Count \
121+
-d inputFile=gs://apache-beam-samples/shakespeare/kinglear.txt \
122+
-d output=gs://<your-gcs-bucket>/wordcount/outputs
123+
```
124+
125+
[Apache Beam]: https://beam.apache.org/
126+
[Google Cloud Dataflow]: https://cloud.google.com/dataflow/docs/
127+
[`Word_Count` template]: https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/WordCount.java
128+
129+
[Cloud SDK]: https://cloud.google.com/sdk/docs/
130+
[Create a new project]: https://console.cloud.google.com/projectcreate
131+
[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project
132+
[Create a service account key]: https://console.cloud.google.com/apis/credentials/serviceaccountkey
133+
[Creating and managing service accounts]: https://cloud.google.com/iam/docs/creating-managing-service-accounts
134+
[GCP Console IAM page]: https://console.cloud.google.com/iam-admin/iam
135+
[Granting roles to service accounts]: https://cloud.google.com/iam/docs/granting-roles-to-service-accounts
136+
137+
[Install Python and virtualenv]: https://cloud.google.com/python/setup

dataflow/run_template/main.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2018 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import json
18+
19+
"""Script to run a Dataflow template."""
20+
21+
22+
def run(project, job, template, parameters=None):
23+
"""Runs a Dataflow template.
24+
25+
Args:
26+
project (str): Google Cloud project ID to run on.
27+
job (str): Unique Dataflow job name.
28+
template (str): Google Cloud Storage path to Dataflow template.
29+
parameters (dict): Dictionary of parameters for the specified template.
30+
Returns:
31+
The response from the Dataflow service after running the template.
32+
"""
33+
parameters = parameters or {}
34+
35+
# [START dataflow_run_template]
36+
from googleapiclient.discovery import build
37+
38+
# project = 'your-gcp-project'
39+
# job = 'unique-job-name'
40+
# template = 'gs://dataflow-templates/latest/Word_Count'
41+
# parameters = {
42+
# 'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
43+
# 'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
44+
# }
45+
46+
service = build('dataflow', 'v1b3')
47+
request = service.projects().templates().launch(
48+
projectId=project,
49+
gcsPath=template,
50+
body={
51+
'jobName': job,
52+
'parameters': parameters,
53+
}
54+
)
55+
56+
response = request.execute()
57+
# [END dataflow_run_template]
58+
return response
59+
60+
61+
def run_template(request):
62+
"""HTTP Cloud Function.
63+
Args:
64+
request (flask.Request): The request object.
65+
<http://flask.pocoo.org/docs/1.0/api/#flask.Request>
66+
Returns:
67+
The response text, or any set of values that can be turned into a
68+
Response object using `make_response`
69+
<http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>.
70+
"""
71+
parameters = request.get_json(silent=True) or {} # application/json
72+
parameters.update(request.form.to_dict()) # Form request data
73+
parameters.update(request.args.to_dict()) # URL parameters
74+
75+
project = parameters.pop('project')
76+
job = parameters.pop('job')
77+
template = parameters.pop('template')
78+
response = run(
79+
project=project,
80+
job=job,
81+
template=template,
82+
parameters=parameters,
83+
)
84+
return json.dumps(response, separators=(',', ':'))
85+
86+
87+
if __name__ == '__main__':
88+
import argparse
89+
90+
parser = argparse.ArgumentParser()
91+
parser.add_argument('--project', required=True,
92+
help='Google Cloud project ID to run on.')
93+
parser.add_argument('--job', required=True,
94+
help='Unique Dataflow job name.')
95+
parser.add_argument('--template', required=True,
96+
help='Google Cloud Storage path to Dataflow template.')
97+
args, unknown_args = parser.parse_known_args()
98+
99+
# Parse the template parameters.
100+
template_argparser = argparse.ArgumentParser()
101+
for arg in unknown_args:
102+
if arg.startswith('-'):
103+
template_argparser.add_argument(arg)
104+
parameters = template_argparser.parse_args(unknown_args)
105+
106+
response = run(
107+
project=args.project,
108+
job=args.job,
109+
template=args.template,
110+
parameters=parameters.__dict__,
111+
)
112+
print(json.dumps(response, indent=2))

dataflow/run_template/main_test.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import flask
16+
import json
17+
import os
18+
import pytest
19+
import subprocess as sp
20+
import time
21+
22+
from datetime import datetime
23+
from werkzeug.urls import url_encode
24+
25+
import main
26+
27+
PROJECT = os.environ['GCLOUD_PROJECT']
28+
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
29+
30+
# Wait time until a job can be cancelled, as a best effort.
31+
# If it fails to be cancelled, the job will run for ~8 minutes.
32+
WAIT_TIME = 5 # seconds
33+
34+
# Create a fake "app" for generating test request contexts.
35+
@pytest.fixture(scope="module")
36+
def app():
37+
return flask.Flask(__name__)
38+
39+
40+
def test_run_template_empty_args(app):
41+
with app.test_request_context():
42+
with pytest.raises(KeyError):
43+
main.run_template(flask.request)
44+
45+
46+
def test_run_template_url(app):
47+
args = {
48+
'project': PROJECT,
49+
'job': datetime.now().strftime('test_run_template_url-%Y%m%d-%H%M%S'),
50+
'template': 'gs://dataflow-templates/latest/Word_Count',
51+
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
52+
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
53+
}
54+
with app.test_request_context('/?' + url_encode(args)):
55+
res = main.run_template(flask.request)
56+
data = json.loads(res)
57+
job_id = data['job']['id']
58+
time.sleep(WAIT_TIME)
59+
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
60+
61+
62+
def test_run_template_data(app):
63+
args = {
64+
'project': PROJECT,
65+
'job': datetime.now().strftime('test_run_template_data-%Y%m%d-%H%M%S'),
66+
'template': 'gs://dataflow-templates/latest/Word_Count',
67+
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
68+
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
69+
}
70+
with app.test_request_context(data=args):
71+
res = main.run_template(flask.request)
72+
data = json.loads(res)
73+
job_id = data['job']['id']
74+
time.sleep(WAIT_TIME)
75+
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
76+
77+
78+
def test_run_template_json(app):
79+
args = {
80+
'project': PROJECT,
81+
'job': datetime.now().strftime('test_run_template_json-%Y%m%d-%H%M%S'),
82+
'template': 'gs://dataflow-templates/latest/Word_Count',
83+
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
84+
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
85+
}
86+
with app.test_request_context(json=args):
87+
res = main.run_template(flask.request)
88+
data = json.loads(res)
89+
job_id = data['job']['id']
90+
time.sleep(WAIT_TIME)
91+
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
google-api-python-client==1.7.9

0 commit comments

Comments
 (0)
0