问题描述
集群基础:
我们有Flink独立群集,其中包含4个节点,每个节点具有16个cpu内核和32Gb物理内存,其中16 GB设置为Flink托管内存,其余全部设置为UDF和Java Heap。 因此,在每个插槽中,我们分配了1个内核和1GB的内存。
方案说明:
我们正在尝试联接两个数据集A和B,其中数据集A是
由于不能同时保证两个数据集的大小,所以在某个时间点A可以很大,而在另一个时间点数据集B可以更大。同样,一个数据集很有可能具有多个重复项列表。
例如:
数据集A的信息为
数据集B的信息大小可能为171 mb
加入关键字:位置示例,孟买,纽约等。
因此要加入此计划,我们选择了一个joinHint策略作为 Repartition_Hash_First 。该策略有时效果很好,有时会抛出以下异常,
java.lang.RuntimeException: Hash join exceeded maximum number of recursions,without reducing partitions enough to be memory resident.
Probable cause Too many duplicate keys.
因此,我们尝试使用Repartition_Hash_Second,但结果相同。
据我所知,Flink在内部为提供viz,First或Second的那一侧创建了一个哈希表,并且另一侧的数据被迭代到哈希表,反之亦然,并且由于其中一个键具有很多无法创建哈希表时无法容纳flink的实际内存,它会抛出重复键太多的异常。
因此,在第二步中,我们尝试使用Repartition_Sort_merge来实现这一点,并且得到了下面的异常,
java.lang.Exception:caused an error obtaining the sort input. the record exceeds maximum size of sort buffer.
如果我们需要将flink管理的内存增加到2 GB甚至更多,谁能建议我吗?还是我们应该采取其他不同的策略来解决这个问题?
解决方法
对我来说似乎很清楚,您的问题是重复组太大。
此外,重复组可能在两侧,该组的大小为O(n ^ 2),n为最大重复组大小。
我建议您在可能的情况下预先对双方进行重复数据删除,例如使用DeduplicateKeepLastRowFunction之类的方法。或使用行中的其他数据构建更好的键。