从Pub / Sub读取并处理gzip

问题描述

我正在建立一个管道来通过Google Cloud Dataflow(Python SDK)处理一些压缩的JSON消息。更准确地说,在将这些JSON文件分组(每组4个)并以gzip格式进行压缩后,才能在Google Cloud Pub / Sub上发布。但是,发布后,我不了解如何解压缩它们,然后为每个JSON消息创建一个PCollection(即4个PCollection)。
如果在ReadFromPubSub步骤之后登录PCollection,我将获得类似的信息:

INFO:root:b'\x1f\xe2\x80\xb9\x08\...\x06\x00\x00'  

我想这是消息的主体(字节对象)。
管道如下:

with beam.Pipeline(options=pipeline_options) as pipeline:
    events = (
        pipeline
        | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=params["input_subscription"])
        | 'Logging zipped' >> beam.ParDo(beam_utils.LogFn())            
        | 'Uncompress data' >> beam.ParDo(UncompressData())
    )  

其中:

class UncompressData(beam.DoFn):
    def process(self,element):
        decompressed_byte_data = zlib.decompress(element,zlib.MAX_WBITS|32)
        yield decompressed_byte_data

但是它不起作用,引发zlib.error(与zlib.MAX_WBITS|16相同):

zlib.error: Error -3 while decompressing data: incorrect header check [while running 'Uncompress data']

以前有人遇到过类似的问题吗?欢迎任何建议或解决方案!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)