问题描述
我已经为这个问题苦苦挣扎了一段时间,却找不到解决方法。我正在建立一个从公共Google云存储桶中获取数据并对其进行一些转换的管道。我现在正努力解决的问题是,每当文件上传到云时,apache Beam就会接收发布/子消息。有一个名为projects/gcp-public-data---goes-16/topics/gcp-public-data-goes-16
的公共主题。桶是16号公众桶:https://console.cloud.google.com/storage/browser/gcp-public-data-goes-16/。特别是我对ABI-RadC文件夹感兴趣,因此我用以下代码初始化了订户:
gcloud beta pubsub subscriptions create goes16-ABI-data-sub-filtered-test --project my-project --topic projects/gcp-public-data---goes-16/topics/gcp-public-data-goes-16 --message-filter='hasPrefix(attributes.objectId,"ABI-L1b-RadC/")' --enable-message-ordering
因此,这在大多数情况下都是有效的,我大约每5分钟收到一次ABI-RadC消息。但是,我应该每5秒钟收到16条消息(每个频段一个),因为那是将内容发布到云存储中的时间。但是,相反,我总是每5分钟收到较少(从2-13到任何地方)的消息。起初,我以为云存储可能搞砸了,所以我每5分钟检查一次google cloud,并且在google cloud中有文件,但是我没有收到有关Apache Beam的消息。这是我用来调试此问题的代码和输出。
import apache_beam as beam
import os
import datetime
import json
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = my_key_path
options = {
'streaming': True
}
def print_name(message):
result = json.loads(message)
file_string = result['name']
band_id = ((file_string[file_string.find("M6")+1:file_string.find("_G16")]))
key = ((file_string[file_string.find("s")+1:file_string.find("_e")]))
print(f"Message recieved at : {datetime.datetime.utcNow()} key : {key} band : {band_id}")
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[],**options)
with beam.Pipeline(runner,options=opts) as p:
sub_message = (p | 'Sub' >> beam.io.ReadFrompubSub(subscription='my_sub_path'))
sub_message | 'print name' >> beam.FlatMap(print_name)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
输出:
Message recieved at : 2020-08-16 23:19:05.360728 key : 20202292316171 band : 6C04
Message recieved at : 2020-08-16 23:19:18.464376 key : 20202292316171 band : 6C13
Message recieved at : 2020-08-16 23:19:18.980477 key : 20202292316171 band : 6C14
Message recieved at : 2020-08-16 23:19:19.972165 key : 20202292316171 band : 6C03
Message recieved at : 2020-08-16 23:19:21.116554 key : 20202292316171 band : 6C05
Message recieved at : 2020-08-16 23:24:03.847833 key : 20202292321171 band : 6C04
Message recieved at : 2020-08-16 23:24:16.814699 key : 20202292321171 band : 6C06
Message recieved at : 2020-08-16 23:24:17.393739 key : 20202292321171 band : 6C08
Message recieved at : 2020-08-16 23:29:07.558796 key : 20202292326171 band : 6C04
Message recieved at : 2020-08-16 23:29:21.100278 key : 20202292326171 band : 6C13
Message recieved at : 2020-08-16 23:29:21.771230 key : 20202292326171 band : 6C15
Message recieved at : 2020-08-16 23:34:15.474699 key : 20202292331171 band : 6C15
Message recieved at : 2020-08-16 23:34:16.006153 key : 20202292331171 band : 6C12
密钥只是文件的时间戳。因此,您可以看到,即使我丢失了云上的文件,我也不会每5分钟收到16条消息。我还尝试了使没有--enable-ordering
和hasPrefix
的新订户,但是它没有任何改变。任何帮助表示赞赏。
更新1 因此,我决定进行另一项测试,以查看是否是Apache Beam或我设置的订阅错误。所以我使用了以下代码:
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# Todo(developer)
project_id = "fire-neural-network"
subscription_id = "custom"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id,subscription_id)
def callback(message):
objectId = message.attributes.get('objectId')
print("Received message: {}".format(objectId))
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path,callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set,result() will block indefinitely,# unless an exception is encountered first.
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
要检查我是否每5分钟收到16条消息,确实如此。所以我的Apache Beam代码一定有问题,而不是订阅问题。另外,我注意到apache Beam不能确认我的消息,而上面的代码却可以。我认为这是错误的原因。但是我不确定如何使apache真正确认消息,尽管我在这里查看:When does Dataflow acknowledge a message of batched items from PubSubIO?,它说要在我尝试过的pub / sub之后添加一个groupbykey,但仍然无法正常工作。 :
import apache_beam as beam
import os
import datetime
import json
# gcloud beta pubsub subscriptions create custom --project fire-neural-network --topic projects/gcp-public-data---goes-16/topics/gcp-public-data-goes-16 --message-filter='hasPrefix(attributes.objectId,"ABI-L1b-RadC/")' --enable-message-ordering
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/n/Keys/fire-neural-network-3b8e8eff4400.json"
options = {
'streaming': True
}
def print_name(message):
result = json.loads(message)
if 'ABI-L1b-RadC' in result['name']:
file_string = result['name']
band_id = ((file_string[file_string.find("M6")+1:file_string.find("_G16")]))
key = ((file_string[file_string.find("s")+1:file_string.find("_e")]))
print(f"Message recieved at : {datetime.datetime.utcNow()} key : {key} band : {band_id}")
output_path = 'gs://fire-neural-network'
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[],options=opts) as p:
sub_message = (
pipeline
| "Read PubSub Messages" >> beam.io.ReadFrompubSub(subscription='mysub')
| "Window into" >> beam.WindowInto(beam.transforms.window.FixedWindows(5))
)
grouped_message = (sub_message | "Add Dummy Key" >> beam.Map(lambda elem: (None,elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _,val: val)
)
grouped_message | "Write" >> beam.io.WritetoPubSub('mytopic')
grouped_message | "Print" >> beam.Map(print_name)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
**更新2 ** 我制作了自己的单独测试主题和订阅,以测试是否是GCS订阅与apache Beam交互导致了问题。当我创建自己的订阅和主题时,所有消息都会被正确确认,只是这个公共存储桶与apache Beam组合在一起才具有怪异的行为。我可能最终会废弃我的光束管道,而只是使用Google的pub / sub API编写自己的未优化管道。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)