8000 Cloud Pub/Sub Quickstart V2 by anguillanneuf · Pull Request #2004 · GoogleCloudPlatform/python-docs-samples · GitHub
[go: up one dir, main page]

Skip to content

Cloud Pub/Sub Quickstart V2 #2004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 12, 2019
73 changes: 73 additions & 0 deletions pubsub/cloud-client/quickstart/pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_quickstart_pub_all]
import argparse
import time
# [START pubsub_quickstart_pub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_pub_deps]


def get_callback(api_future, data):
"""Wrap message data in the context of the callback function."""

def callback(api_future):
try:
print("Published message {} now has message ID {}".format(
data, api_future.result()))
except Exception:
print("A problem occurred when publishing {}: {}\n".format(
data, api_future.exception()))
raise
return callback


def pub(project_id, topic_name):
"""Publishes a message to a Pub/Sub topic."""
# [START pubsub_quickstart_pub_client]
# Initialize a Publisher client
client = pubsub_v1.PublisherClient()
# [END pubsub_quickstart_pub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/topics/{topic_name}`
topic_path = client.topic_path(project_id, topic_name)

# Data sent to Cloud Pub/Sub must be a bytestring
data = b"Hello, World!"

# When you publish a message, the client returns a future.
api_future = client.publish(topic_path, data=data)
api_future.add_done_callback(get_callback(api_future, data))

# Keep the main thread from exiting until background message
# is processed.
while api_future.running():
time.sleep(0.1)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Google Cloud project ID')
parser.add_argument('topic_name', help='Pub/Sub topic name')

args = parser.parse_args()

pub(args.project_id, args.topic_name)
# [END pubsub_quickstart_pub_all]
61 changes: 61 additions & 0 deletions pubsub/cloud-client/quickstart/pub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pytest

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1

import pub

PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'quickstart-pub-test-topic'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope='module')
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
except AlreadyExists:
pass

yield TOPIC


@pytest.fixture
def to_delete(publisher_client):
doomed = []
yield doomed
for item in doomed:
publisher_client.delete_topic(item)


def test_pub(publisher_client, topic, to_delete, capsys):
pub.pub(PROJECT, topic)

to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC))

out, _ = capsys.readouterr()

assert "Published message b'Hello, World!'" in out
64 changes: 64 additions & 0 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_quickstart_sub_all]
import argparse
import time
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]


def sub(project_id, subscription_name):
"""Receives messages from a Pub/Sub subscription."""
# [START pubsub_quickstart_sub_client]
# Initialize a Subscriber client
client = pubsub_v1.SubscriberClient()
# [END pubsub_quickstart_sub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = client.subscription_path(
project_id, subscription_name)

def callback(message):
print('Received message {} of message ID {}'.format(
message, message.message_id))
# Acknowledge the message. Unack'ed messages will be redelivered.
message.ack()
print('Acknowledged message of message ID {}\n'.format(
message.message_id))

client.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

# Keep the main thread from exiting so the subscriber can
# process messages in the background.
while True:
time.sleep(60)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Google Cloud project ID')
parser.add_argument('subscription_name', help='Pub/Sub subscription name')

args = parser.parse_args()

sub(args.project_id, args.subscription_name)
# [END pubsub_quickstart_sub_all]
113 changes: 113 additions & 0 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import pytest
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1

import sub


PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'quickstart-sub-test-topic'
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope='module')
def topic_path(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
except AlreadyExists:
pass

yield topic_path


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic_path):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.create_subscription(subscription_path, topic_path)
except AlreadyExists:
pass

yield SUBSCRIPTION


@pytest.fixture
def to_delete(publisher_client, subscriber_client):
doomed = []
yield doomed
for client, item in doomed:
if 'topics' in item:
publisher_client.delete_topic(item)
if 'subscriptions' in item:
subscriber_client.delete_subscription(item)


def _make_sleep_patch():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer using the monkeypatch fixture for pytest. It can clean up after your tests so that the sleep function is restored after the test is done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need some help with this. In the meanwhile, since other tests are also written using _make_sleep_patch in the same repo, shall we start another PR just for this purpose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the pattern I'm describing: https://stackoverflow.com/a/29110609/101923,

Now that I have a closer look, mock.patch does the same thing, but this still seems much more complicated than it needs to be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried monkeypatch but it created an infinite loop.

def mock_sleep(sleep):
    time.sleep(10)
    raise RuntimeError('sigil')

monkeypatch.setattr(time, 'sleep', mock_sleep)

while True: time.sleep(60) would call time.sleep(10), which calls time.sleep(10) and never reaches raise RuntimeError('sigil').

real_sleep = time.sleep

def new_sleep(period):
if period == 60:
real_sleep(10)
raise RuntimeError('sigil')
else:
real_sleep(period)

return mock.patch('time.sleep', new=new_sleep)


def test_sub(publisher_client,
topic_path,
subscriber_client,
subscription,
to_delete,
capsys):

publisher_client.publish(topic_path, data=b'Hello, World!')

to_delete.append((publisher_client, topic_path))

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
sub.sub(PROJECT, subscription)

to_delete.append((subscriber_client,
'projects/{}/subscriptions/{}'.format(PROJECT,
SUBSCRIPTION)))

out, _ = capsys.readouterr()
assert "Received message" in out
assert "Acknowledged message" in out
0