10000 feat(storagetransfer): Adds event driven transfer samples (#10622) · kevingoh/python-docs-samples@f33ae9e · GitHub
[go: up one dir, main page]

Skip to content

Commit f33ae9e

Browse files
JesseLovelacegcf-owl-bot[bot]msampathkumarm-strzelczyk
authored
feat(storagetransfer): Adds event driven transfer samples (GoogleCloudPlatform#10622)
* feat(storagetransfer): Adds event driven transfer samples * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Update storagetransfer/event_driven_aws_transfer.py Co-authored-by: Sampath Kumar <sampathm@google.com> * fix typo in event_driven_aws_transfer.py --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Sampath Kumar <sampathm@google.com> Co-authored-by: Maciej Strzelczyk <strzelczyk@google.com>
1 parent 2f12a30 commit f33ae9e

7 files changed

+375
-2
lines changed

storagetransfer/conftest.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from azure.storage.blob import BlobServiceClient, ContainerClient
2424
import boto3
25-
from google.cloud import secretmanager, storage, storage_transfer
25+
from google.cloud import pubsub_v1, secretmanager, storage, storage_transfer
2626
from google.cloud.storage_transfer import TransferJob
2727

2828
import pytest
@@ -367,3 +367,41 @@ def manifest_file(source_bucket: storage.Bucket):
367367

368368
# use arbitrary path and name
369369
yield f"gs://{source_bucket.name}/test-manifest.csv"
370+
371+
372+
@pytest.fixture
373+
def pubsub_id(project_id: str):
374+
"""
375+
Yields a pubsub subscription ID. Deletes it afterwards
376+
"""
377+
publisher = pubsub_v1.PublisherClient()
378+
topic_id = f"pubsub-sts-topic-{uuid.uuid4()}"
379+
topic_path = publisher.topic_path(project_id, topic_id)
380+
publisher.create_topic(request={"name": topic_path})
381+
382+
subscriber = pubsub_v1.SubscriberClient()
383+
subscription_id = f"pubsub-sts-subscription-{uuid D7AE .uuid4()}"
384+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
385+
subscription = subscriber.create_subscription(
386+
request={"name": subscription_path, "topic": topic_path}
387+
)
388+
389+
yield str(subscription.name)
390+
391+
subscriber.delete_subscription(request={"subscription": subscription_path})
392+
subscriber.close()
393+
publisher.delete_topic(request={"topic": topic_path})
394+
395+
396+
@pytest.fixture
397+
def sqs_queue_arn(secret_cache):
398+
"""
399+
Yields an AWS SQS queue ARN. Deletes it afterwards.
400+
"""
401+
sqs = boto3.resource("sqs", **aws_key_pair(secret_cache), region_name="us-west-1")
402+
queue_name = f"sqs-sts-queue-{uuid.uuid4()}"
403+
queue = sqs.create_queue(QueueName=queue_name)
404+
405+
yield queue.attributes["QueueArn"]
406+
407+
queue.delete()
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2023 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""
18+
Command-line sample that creates an event driven transfer between two GCS buckets that tracks an AWS SQS queue.
19+
"""
20+
21+
import argparse
22+
23+
# [START storagetransfer_create_event_driven_aws_transfer]
24+
25+
from google.cloud import storage_transfer
26+
27+
28+
def create_event_driven_aws_transfer(
29+
project_id: str,
30+
description: str,
31+
source_s3_bucket: str,
32+
sink_gcs_bucket: str,
33+
sqs_queue_arn: str,
34+
aws_access_key_id: str,
35+
aws_secret_access_key: str,
36+
):
37+
"""Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""
38+
39+
client = storage_transfer.StorageTransferServiceClient()
40+
41+
# The ID of the Google Cloud Platform Project that owns the job
42+
# project_id = 'my-project-id'
43+
44+
# A description of this job
45+
# description = 'Creates an event-driven transfer that tracks an SQS queue'
46+
47+
# AWS S3 source bucket name
48+
# source_s3_bucket = 'my-s3-source-bucket'
49+
50+
# Google Cloud Storage destination bucket name
51+
# sink_gcs_bucket = 'my-gcs-destination-bucket'
52+
53+
# The ARN of the SQS queue to subscribe to
54+
# pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'
55+
56+
# AWS Access Key ID. Should be accessed via environment variable for security purposes.
57+
# aws_access_key_id = 'AKIA...'
58+
59+
# AWS Secret Access Key. Should be accessed via environment variable for security purposes.
60+
# aws_secret_access_key = 'HEAoMK2.../...ku8'
61+
62+
transfer_job_request = storage_transfer.CreateTransferJobRequest(
63+
{
64+
"transfer_job": {
65+
"project_id": project_id,
66+
"description": description,
67+
"status": storage_transfer.TransferJob.Status.ENABLED,
68+
"transfer_spec": {
69+
"aws_s3_data_source": {
70+
"bucket_name": source_s3_bucket,
71+
"aws_access_key": {
72+
"access_key_id": aws_access_key_id,
73+
"secret_access_key": aws_secret_access_key,
74+
},
75+
},
76+
"gcs_data_sink": {
77+
"bucket_name": sink_gcs_bucket,
78+
},
79+
},
80+
"event_stream": {
81+
"name": sqs_queue_arn,
82+
},
83+
},
84+
}
85+
)
86+
87+
result = client.create_transfer_job(transfer_job_request)
88+
print(f"Created transferJob: {result.name}")
89+
90+
91+
# [END storagetransfer_create_event_driven_aws_transfer]
92+
93+
if __name__ == "__main__":
94+
parser = argparse.ArgumentParser(description=__doc__)
95+
parser.add_argument(
96+
"--project-id",
97+
help="The ID of the Google Cloud Platform Project that owns the job",
98+
required=True,
99+
)
100+
parser.add_argument(
101+
"--description",
102+
help="A useful description for your transfer job",
103+
default="My transfer job",
104+
)
105+
parser.add_argument(
106+
"--source-s3-bucket", help="AWS S3 source bucket name", required=True
107+
)
108+
parser.add_argument(
109+
"--sink-gcs-bucket",
110+
help="Google Cloud Storage destination bucket name",
111+
required=True,
112+
)
113+
parser.add_argument(
114+
"--sqs-queue-arn",
115+
help="The ARN of the AWS SQS queue to track",
116+
required=True,
117+
)
118+
119+
args = parser.parse_args()
120+
121+
create_event_driven_aws_transfer(**vars(args))
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2023 Google LLC
2+
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import backoff
16+
from google.api_core.exceptions import RetryError
17+
from google.api_core.exceptions import ServiceUnavailable
18+
from google.cloud.storage import Bucket
19+
20+
import event_driven_aws_transfer
21+
22+
23+
@backoff.on_exception(
24+
backoff.expo,
25+
(
26+
RetryError,
27+
ServiceUnavailable,
28+
),
29+
max_time=60,
30+
)
31+
def test_event_driven_aws_transfer(
32+
capsys,
33+
project_id: str,
34+
job_description_unique: str,
35+
aws_source_bucket: str,
36+
destination_bucket: Bucket,
37+
sqs_queue_arn: str,
38+
aws_access_key_id: str,
39+
aws_secret_access_key: str,
40+
):
41+
event_driven_aws_transfer.create_event_driven_aws_transfer(
42+
project_id=project_id,
43+
description=job_description_unique,
44+
source_s3_bucket=aws_source_bucket,
45+
sink_gcs_bucket=destination_bucket.name,
46+
sqs_queue_arn=sqs_queue_arn,
47+
aws_access_key_id=aws_access_key_id,
48+
aws_secret_access_key=aws_secret_access_key,
49+
)
50+
51+
out, _ = capsys.readouterr()
52+
53+
# Ensure the transferJob has been created
54+
assert "Created transferJob:" in out
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2023 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""
18+
Command-line sample that creates aan event driven transfer between two GCS buckets that tracks a PubSub subscription.
19+
"""
20+
21+
import argparse
22+
23+
# [START storagetransfer_create_event_driven_gcs_transfer]
24+
25+
from google.cloud import storage_transfer
26+
27+
28+
def create_event_driven_gcs_transfer(
29+
project_id: str,
30+
description: str,
31+
source_bucket: str,
32+
sink_bucket: str,
33+
pubsub_id: str,
34+
):
35+
"""Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""
36+
37+
client = storage_transfer.StorageTransferServiceClient()
38+
39+
# The ID of the Google Cloud Platform Project that owns the job
40+
# project_id = 'my-project-id'
41+
42+
# A description of this job
43+
# description = 'Creates an event-driven transfer that tracks a pubsub subscription'
44+
45+
# Google Cloud Storage source bucket name
46+
# source_bucket = 'my-gcs-source-bucket'
47+
48+
# Google Cloud Storage destination bucket name
49+
# sink_bucket = 'my-gcs-destination-bucket'
50+
51+
# The Pubsub Subscription ID to track
52+
# pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'
53+
54+
transfer_job_request = storage_transfer.CreateTransferJobRequest(
55+
{
56+
"transfer_job": {
57+
"project_id": project_id,
58+
"description": description,
59+
"status": storage_transfer.TransferJob.Status.ENABLED,
60+
"transfer_spec": {
61+
"gcs_data_source": {
62+
"bucket_name": source_bucket,
63+
},
64+
"gcs_data_sink": {
65+
"bucket_name": sink_bucket,
66+
},
67+
},
68+
"event_stream": {
69+
"name": pubsub_id,
70+
},
71+
},
72+
}
73+
)
74+
75+
result = client.create_transfer_job(transfer_job_request)
76+
print(f"Created transferJob: {result.name}")
77+
78+
79+
# [END storagetransfer_create_event_driven_gcs_transfer]
80+
81+
if __name__ == "__main__":
82+
parser = argparse.ArgumentParser(description=__doc__)
83+
parser.add_argument(
84+
"--project-id",
85+
help="The ID of the Google Cloud Platform Project that owns the job",
86+
required=True,
87+
)
88+
parser.add_argument(
89+
"--description",
90+
help="A useful description for your transfer job",
91+
default="My transfer job",
92+
)
93+
parser.add_argument(
94+
"--source-bucket", help="Google Cloud Storage source bucket name", required=True
95+
)
96+
parser.add_argument(
97+
"--sink-bucket",
98+
help="Google Cloud Storage destination bucket name",
99+
required=True,
100+
)
101+
parser.add_argument(
102+
"--pubsub-id",
103+
help="The subscription ID of the PubSub queue to track",
104+
required=True,
105+
)
106+
107+
args = parser.parse_args()
108+
109+
create_event_driven_gcs_transfer(**vars(args))
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2023 Google LLC
2+
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import backoff
16+
from google.api_core.exceptions import RetryError
17+
from google.api_core.exceptions import ServiceUnavailable
18+
from google.cloud.storage import Bucket
19+
20+
import event_driven_gcs_transfer
21+
22+
23+
@backoff.on_exception(
24+
backoff.expo,
25+
(
26+
RetryError,
27+
ServiceUnavailable,
28+
),
29+
max_time=60,
30+
)
31+
def test_event_driven_gcs_transfer(
32+
capsys,
33+
project_id: str,
34+
job_description_unique: str,
35+
source_bucket: Bucket,
36+
destination_bucket: Bucket,
37 C0EB +
pubsub_id: str,
38+
):
39+
event_driven_gcs_transfer.create_event_driven_gcs_transfer(
40+
project_id=project_id,
41+
description=job_description_unique,
42+
source_bucket=source_bucket.name,
43+
sink_bucket=destination_bucket.name,
44+
pubsub_id=pubsub_id,
45+
)
46+
47+
out, _ = capsys.readouterr()
48+
49+
# Ensure the transferJob has been created
50+
assert "Created transferJob:" in out

storagetransfer/requirements-test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ azure-storage-blob==12.16.0
22
backoff==2.2.1; python_version < "3.7"
33
backoff==2.2.1; python_version >= "3.7"
44
boto3==1.26.150
5+
google-cloud-pubsub==2.18.4
56
google-cloud-storage==2.9.0; python_version < '3.7'
67
google-cloud-storage==2.9.0; python_version > '3.6'
78
google-cloud-secret-manager==2.16.1

0 commit comments

Comments
 (0)
0