-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Closed
Labels
api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.Important issue which blocks shipping the next release. Will be fixed prior to next release.release blockingRequired feature/issue must be fixed prior to next release.Required feature/issue must be fixed prior to next release.triaged for GAtype: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Description
This code fails with a pickle error:
def multiprocessing_callback(message):
print('Received message: {}'.format(message))
message.ack()
def receive_messages_with_multiprocessing(
project, topic_name, subscription_name):
"""Receives messages from a pull subscription using multiprocessing."""
import concurrent.futures
import functools
import multiprocessing
import google.cloud.pubsub_v1.subscriber.policy.thread
# Create a process pool and a queue for sending messages. This will
# be used by the subscriber policy to execute callbacks.
executor = concurrent.futures.ProcessPoolExecutor()
manager = multiprocessing.Manager()
queue = manager.Queue()
policy_factory = functools.partial(
google.cloud.pubsub_v1.subscriber.policy.thread.Policy,
executor=executor,
queue=queue)
subscriber = pubsub_v1.SubscriberClient(policy_class=policy_factory)
subscription_path = subscriber.subscription_path(
project, subscription_name)
subscriber.subscribe(subscription_path, callback=multiprocessing_callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
soaxelbrooke
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the Pub/Sub API.Issues related to the Pub/Sub API.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.Important issue which blocks shipping the next release. Will be fixed prior to next release.release blockingRequired feature/issue must be fixed prior to next release.Required feature/issue must be fixed prior to next release.triaged for GAtype: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.