使用Apache Beam和Python

问题描述

我正在尝试使用Apache Beam(https://cloud.google.com/pubsub/docs/publisher)将带有ordering_key的Google PubSub消息写入主题。尽管带有ordering_key的Google Pubsub是beta版功能,但我仍可以使用普通的PubSub客户端库发布消息。我希望也能够在Apache Beam中做到这一点。但是,它似乎对Python Apache Beam库不可用。 我一直在尝试重写beam.io.WritetoPubSub(通过更改_to_proto_str)以使用ordering_key(https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage)编写protobuf消息。最后消息将是这样

  "data": string,"attributes": {
    string: string,...
  },"messageId": string,"publishTime": string,"ordering_key": string
}
sdks.python.apache_beam.io.gcp.pubsub.PubsubMessage._to_proto_str
  def _to_proto_str(self):
    msg = pubsub.types.pubsub_pb2.PubsubMessage()
    msg.data = self.data
    for key,value in iteritems(self.attributes):
      msg.attributes[key] = value
    msg.ordering_key = self.ordering_key
    return msg.SerializetoString()

但是,当我查看结束于主题中的结束消息时,ordering_key似乎消失了。在最坏的情况下,我想我也可以使用PubSub客户端发布消息。 但是,如果有人能为我指出正确的方向,那会更好。我知道apache Beam项目的贡献者必须做过类似的事情,因为他们不久前就包含了更改PubSub消息属性功能

已更新:Apache Beam 2.24.0依赖于PubSub Client库的旧版本。我认为是因为他们希望保留对Python 2更长的支持。但是,一切都可能在10月7日左右结束(至少对于Google而言,在那之后他们将停止对Dataflow的Python 2支持)。其他所有人可能需要等待2.24.0之后的任何版本。

解决此问题,我已成功在Apache Beam 2.24.0的顶部安装了最新的PubSub客户端库。并创建新的自定义PubSub IO作为DoFn(您只需覆盖设置方法并在其中创建Publisher Client)。我现在可以使用订购密钥发布消息。但是,我不确定是否有任何经纪人因为我的变更而定,是否适合演示用途。

 def setup(self):
        publisher_options = pubsub_v1.types.PublisherOptions(
            enable_message_ordering=True
        )
        self.publisher = pubsub_v1.PublisherClient(
            publisher_options=publisher_options,batch_settings=pubsub_v1.types.BatchSettings()
        )

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)