Apache Flink中的哈希联接和排序合并异常

问题描述

集群基础:

我们有Flink独立群集,其中包含4个节点,每个节点具有16个cpu内核和32Gb物理内存,其中16 GB设置为Flink托管内存,其余全部设置为UDF和Java Heap。 因此,在每个插槽中,我们分配了1个内核和1GB的内存。

方案说明:

我们正在尝试联接两个数据集A和B,其中数据集A是 元组,数据集B具有自定义POJO 和联接键这两个数据集都是 String

由于不能同时保证两个数据集的大小,所以在某个时间点A可以很大,而在另一个时间点数据集B可以更大。同样,一个数据集很有可能具有多个重复项列表。

例如: 数据集A的信息为 size = 51 mb
数据集B的信息大小可能为171 mb
加入关键字:位置示例,孟买,纽约等。

因此要加入此计划,我们选择了一个joinHint策略作为 Repartition_Hash_First 。该策略有时效果很好,有时会抛出以下异常,

java.lang.RuntimeException: Hash join exceeded maximum number of recursions,without reducing partitions enough to be memory resident.
Probable cause Too many duplicate keys.

因此,我们尝试使用Repartition_Hash_Second,但结果相同。

据我所知,Flink在内部为提供viz,First或Second的那一侧创建了一个哈希表,并且另一侧的数据被迭代到哈希表,反之亦然,并且由于其中一个键具有很多无法创建哈希表时无法容纳flink的实际内存,它会抛出重复键太多的异常。

因此,在第二步中,我们尝试使用Repartition_Sort_merge来实现这一点,并且得到了下面的异常,

java.lang.Exception:caused an error obtaining the sort input. the record exceeds maximum size of sort buffer.

如果我们需要将flink管理的内存增加到2 GB甚至更多,谁能建议我吗?还是我们应该采取其他不同的策略来解决这个问题?

解决方法

对我来说似乎很清楚,您的问题是重复组太大。

此外,重复组可能在两侧,该组的大小为O(n ^ 2),n为最大重复组大小。

我建议您在可能的情况下预先对双方进行重复数据删除,例如使用DeduplicateKeepLastRowFunction之类的方法。或使用行中的其他数据构建更好的键。