Apache Beam无法正确接收来自google-cloud-storage的发布/订阅消息

问题描述

我已经为这个问题苦苦挣扎了一段时间,却找不到解决方法。我正在建立一个从公共Google云存储桶中获取数据并对其进行一些转换的管道。我现在正努力解决的问题是,每当文件上传到云时,apache Beam就会接收发布/子消息。有一个名为projects/gcp-public-data---goes-16/topics/gcp-public-data-goes-16的公共主题。桶是16号公众桶:https://console.cloud.google.com/storage/browser/gcp-public-data-goes-16/。特别是我对ABI-RadC文件夹感兴趣,因此我用以下代码初始化了订户:

gcloud beta pubsub subscriptions create goes16-ABI-data-sub-filtered-test --project my-project --topic projects/gcp-public-data---goes-16/topics/gcp-public-data-goes-16 --message-filter='hasPrefix(attributes.objectId,"ABI-L1b-RadC/")' --enable-message-ordering

因此,这在大多数情况下都是有效的,我大约每5分钟收到一次ABI-RadC消息。但是,我应该每5秒钟收到16条消息(每个频段一个),因为那是将内容发布到云存储中的时间。但是,相反,我总是每5分钟收到较少(从2-13到任何地方)的消息。起初,我以为云存储可能搞砸了,所以我每5分钟检查一次google cloud,并且在google cloud中有文件,但是我没有收到有关Apache Beam的消息。这是我用来调试此问题的代码和输出。

import apache_beam as beam
import os 
import datetime
import json

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = my_key_path

options = {
    'streaming': True
}

def print_name(message):
    result = json.loads(message)
    file_string = result['name']
    band_id = ((file_string[file_string.find("M6")+1:file_string.find("_G16")]))
    key = ((file_string[file_string.find("s")+1:file_string.find("_e")]))
    print(f"Message recieved at : {datetime.datetime.utcnow()}   key : {key}    band : {band_id}")

runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[],**options)
with beam.Pipeline(runner,options=opts) as p:
    sub_message = (p | 'Sub' >> beam.io.ReadFromPubSub(subscription='my_sub_path'))
    sub_message | 'print name' >> beam.FlatMap(print_name)


    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()

输出:

Message recieved at : 2020-08-16 23:19:05.360728   key : 20202292316171    band : 6C04
Message recieved at : 2020-08-16 23:19:18.464376   key : 20202292316171    band : 6C13
Message recieved at : 2020-08-16 23:19:18.980477   key : 20202292316171    band : 6C14
Message recieved at : 2020-08-16 23:19:19.972165   key : 20202292316171    band : 6C03
Message recieved at : 2020-08-16 23:19:21.116554   key : 20202292316171    band : 6C05
Message recieved at : 2020-08-16 23:24:03.847833   key : 20202292321171    band : 6C04
Message recieved at : 2020-08-16 23:24:16.814699   key : 20202292321171    band : 6C06
Message recieved at : 2020-08-16 23:24:17.393739   key : 20202292321171    band : 6C08
Message recieved at : 2020-08-16 23:29:07.558796   key : 20202292326171    band : 6C04
Message recieved at : 2020-08-16 23:29:21.100278   key : 20202292326171    band : 6C13
Message recieved at : 2020-08-16 23:29:21.771230   key : 20202292326171    band : 6C15
Message recieved at : 2020-08-16 23:34:15.474699   key : 20202292331171    band : 6C15
Message recieved at : 2020-08-16 23:34:16.006153   key : 20202292331171    band : 6C12

密钥只是文件的时间戳。因此,您可以看到,即使我丢失了云上的文件,我也不会每5分钟收到16条消息。我还尝试了使没有--enable-orderinghasPrefix的新订户,但是它没有任何改变。任何帮助表示赞赏。

更新1 因此,我决定进行另一项测试,以查看是否是Apache Beam或我设置的订阅错误。所以我使用了以下代码:

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
project_id = "fire-neural-network"
subscription_id = "custom"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id,subscription_id)

def callback(message):
    objectId = message.attributes.get('objectId')
    print("Received message: {}".format(objectId))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path,callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set,result() will block indefinitely,# unless an exception is encountered first.
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()

要检查我是否每5分钟收到16条消息,确实如此。所以我的Apache Beam代码一定有问题,而不是订阅问题。另外,我注意到apache Beam不能确认我的消息,而上面的代码却可以。我认为这是错误的原因。但是我不确定如何使apache真正确认消息,尽管我在这里查看:When does Dataflow acknowledge a message of batched items from PubSubIO?,它说要在我尝试过的pub / sub之后添加一个groupbykey,但仍然无法正常工作。 :

import apache_beam as beam
import os 
import datetime
import json

# gcloud beta pubsub subscriptions create custom --project fire-neural-network --topic projects/gcp-public-data---goes-16/topics/gcp-public-data-goes-16 --message-filter='hasPrefix(attributes.objectId,"ABI-L1b-RadC/")' --enable-message-ordering


os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/n/Keys/fire-neural-network-3b8e8eff4400.json"

options = {
    'streaming': True
}

def print_name(message):
    result = json.loads(message)
    if 'ABI-L1b-RadC' in result['name']:
        file_string = result['name']
        band_id = ((file_string[file_string.find("M6")+1:file_string.find("_G16")]))
        key = ((file_string[file_string.find("s")+1:file_string.find("_e")]))
        print(f"Message recieved at : {datetime.datetime.utcnow()}   key : {key}    band : {band_id}")

output_path = 'gs://fire-neural-network'
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[],options=opts) as p:
        sub_message = (
            pipeline
            | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription='mysub')
            | "Window into" >> beam.WindowInto(beam.transforms.window.FixedWindows(5))
            
        )

        grouped_message = (sub_message | "Add Dummy Key" >> beam.Map(lambda elem: (None,elem))
        | "Groupby" >> beam.GroupByKey()
        | "Abandon Dummy Key" >> beam.MapTuple(lambda _,val: val)
        )

        grouped_message | "Write" >> beam.io.WriteToPubSub('mytopic')
        grouped_message | "Print" >> beam.Map(print_name)

    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()

**更新2 ** 我制作了自己的单独测试主题和订阅,以测试是否是GCS订阅与apache Beam交互导致了问题。当我创建自己的订阅和主题时,所有消息都会被正确确认,只是这个公共存储桶与apache Beam组合在一起才具有怪异的行为。我可能最终会废弃我的光束管道,而只是使用Google的pub / sub API编写自己的未优化管道。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...