问题描述
我正在尝试将两个不同的Google pub / sub订阅者设置为不同的订阅,但是使用相同的代码。为了画出更好的图片,请说我有topic1和topic2。然后,我有订阅了topic1的subscription1和订阅了topic2的subscription2。然后,我有订户1与订户1链接,订户2与订户2链接。我的问题是如何在同一应用程序中使用Subscriber1和Subscriber2。我只有1个订阅者的示例是(来自文档):
project_id = "my-project-id"
subscription_id = "subscription1"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id,subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path,callback=callback)
logging.info("Listening for messages on {}..\n".format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set,result() will block indefinitely,# unless an exception is encountered first.
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
如何在其中添加subscription2,以便我的python应用程序可以同时从topic1和topic2获取消息?我在文档中找不到它,但是如果我只是想念它,就让我知道!
解决方法
如果要同时接收来自两个订阅的消息,请创建两个SubscriberClient实例,每个订阅实例一个。要合并期货,可以使用事件:
project_id = "my-project-id"
subscription_id1 = "subscription1"
subscription_id2 = "subscription2"
subscriber1 = pubsub_v1.SubscriberClient()
subscriber2 = pubsub_v1.SubscriberClient()
subscription_path1 = subscriber.subscription_path(project_id,subscription_id1)
subscription_path2 = subscriber.subscription_path(project_id,subscription_id2)
streaming_pull_future1 = subscriber1.subscribe(subscription_path1,callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path1))
streaming_pull_future2 = subscriber2.subscribe(subscription_path2,callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path2))
subscriber_shutdown = threading.Event()
streaming_pull_future1.add_done_callback(lambda result: subscriber_shutdown.set())
streaming_pull_future2.add_done_callback(lambda result: subscriber_shutdown.set())
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber1,subscriber2:
subscriber_shutdown.wait()
streaming_pull_future1.cancel()
streaming_pull_future2.cancel()