Apache Beam GroupByKey使用PubSubIO

问题描述

我们需要通过消息中的字段之一对PubSub消息进行分组。我们使用15分钟的固定窗口将这些消息分组。

在数据流上运行时,用于消息分组的GroupByKey引入了太多重复元素,管道远端的另一个GroupByKey失败,原因为'KeyCommitTooLargeException:对阶段P27的提交请求和密钥abc#123的大小为225337153,超过限制。'

我浏览了以下链接,发现建议使用Reshuffle,但Reshuffle内部具有GroupByKey。 Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?

我的管道代码

PCollection<String> messages = getReadPubSubSubscription(options,pipeline);

PCollection<String> windowedMessages = messages
    .apply(
        Window
            .<String>into(
                FixedWindows.of(Duration.standardMinutes(15)))
            .discardingFiredPanes());
            
PCollectionTuple objectsTuple = windowedMessages
    .apply(
        "UnmarshalStrings",ParDo
            .of(new StringUnmarshallFn())
            .withOutputTags(
                StringUnmarshallFn.mainOutputTag,TupleTagList.of(StringUnmarshallFn.deadLetterTag)));

PCollection<KV<String,Iterable<ABCObject>>> groupedobjects =
    objectsTuple.get(StringUnmarshallFn.mainOutputTag)
        .apply(
            "GroupByObjects",GroupByKey.<String,ABCObject>create());

PCollection results = groupedobjects
    .apply(
        "FetchForEachKey",ParDo.of(SomeFn())).get(SomeFn.tag)
    .apply(
        "Reshuffle",Reshuffle.viarandomKey());
                
results.apply(...)

...

PubSub不能肯定地复制消息,并且没有其他失败,GroupByKey正在创建这些重复项,我使用的Windowing出了点问题吗?

一个观察结果是GroupBy产生的元素数量与下一步产生的元素相同。我要附加两个屏幕截图,一个用于GroupByKey,另一个用于获取功能

GroupByKey步骤

GroupByKey

获取步骤

Fetch

更新,经过其他分析

阶段P27实际上是第一个GroupByKey,它输出的元素比预期的要多。我看不到它们是实际输出元素的重复,因为所有这100万个元素都不会在下一个提取步骤中处理。我不确定这些是数据流引入的一些虚拟元素还是来自数据流的错误度量。

我仍在进一步分析为什么会抛出此KeyCommitTooLargeException,因为我只有一个输入元素,并且分组只能产生一个可迭代的元素。我也已经在Google开了票。

解决方法

docker ps -a按键和窗口分组。如果没有触发器,则每个键和窗口仅输出一个元素,每个输入元素最多也可以输出1个元素。

如果您看到任何其他行为,则可能是错误,可以报告。您可能需要提供更多步骤来重现此问题,包括示例数据和整个可运行管道。

,

由于您已在UPDATE中澄清没有重复,而是以某种方式添加了伪记录(真是奇怪),this old thread报告了类似的问题,并且答案很有趣,因为指出了protobuf序列化问题是由于在单个窗口中对大量数据进行分组造成的。

我建议使用可用的故障排除步骤(例如12)来确定问题是从代码的哪一部分开始的。例如,我仍然认为new StringUnmarshallFn()可能正在执行有助于生成虚拟记录的任务。您可能需要在步骤中实施counters,以尝试确定每个步骤生成了多少记录。

如果找不到解决方案,最好的选择是联系GCP Support,也许他们可以解决。