问题描述
我在Google Cloud Platform中设置了一些数据处理工作流程。这些位置处理物理地址并返回有关它们的一些度量。工作流使用Cloud Functions流和PubSub流的组合。
在工作流程中使用一个Google Cloud Function时,某些消息不会从触发流中提取或被提取多次。我知道一定程度的预期。但是,这经常发生。足以在某些位置造成10倍的高估,而在另一些位置则没有结果。
我认为callback
函数不能正确地确认消息,但是我不确定为了获得更可靠的消息接收和确认应该有什么区别。任何建议表示赞赏。
我的检索指标的GCP云功能由PubSub流触发,并执行retrieve_location
函数,将数据发送到其他PubSub流。 retrieve_location
函数如下所示:
def retrieve_location(event,context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id,subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path,callback=callback)
get_metrics
函数从每条消息中获取有效载荷,检索一些数据并将其发送到另一个流。该功能似乎按预期工作。
def get_metrics(loc):
<... retrieve and process data,my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id,topic_name)
try:
publisher.publish(topic_path,data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ",exc)
解决方法
您应该创建一个background function订阅该主题的主题,该主题直接处理有效负载,而不是在Cloud Function中设置第二个Pub / Sub订阅者,例如:
def get_metrics_background_function(event,context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
,
您似乎可以直接通过Cloud Pub / Sub客户端库将使用Cloud Pub / Sub来触发Cloud Function与使用Pub / Sub进行混合。通常,您会想要做一个或另一个。
如果您创建的订阅是通过Cloud Functions完成的,则您的retrieve_location
函数实际上并不是在接收和处理消息。取而代之的是,由于subscriber.subscribe
即将运行完成,因此您的函数将完成执行,因此它正在启动订户客户端,然后不久便关闭。
如果此函数正在将客户端启动到触发Cloud Function的相同订阅,则它实际上不做任何事情,因为基于Cloud Function的订阅在客户端库时使用push模型应该与pull模型一起使用。
您要么希望直接在callback
中的retrieve_location
中执行该操作,就将该事件用作消息(如Dustin所述),或者您想与客户端建立一个持久性订户库,例如在GCE上,该库实例化订户并在其上调用subscribe
。