问题描述
在我的 pyspark 代码中,我有一个包含以下列的表格(添加了示例数据):
id | 姓名 | 顺序 |
---|---|---|
1 | 一 | AGCCAGC |
1 | b | ACCCAGC |
2 | 一 | AAGCAAA |
2 | c | AGCCAAA |
我正在尝试构建一个图,其中每个 ID 都是一个完全独立的子图,如果两行的名称不同并且它们之间的 Levenshtein 距离小于一个常数,则两行之间存在一条边。此外,各行的序列长度也是恒定的。
代码示例:
edge_df = source_df.join(
end_df,[F.col('source_id') == F.col('end_id'),F.col('source_name') !=
F.col('end_name')],).withColumn(
'edit_distance',F.levenshtein(F.col('source_sequence'),F.col('end_sequence')),).filter(
F.col('edit_distance') <= F.lit(job_info['cutoffs']['inclusion']),).persist()
其中 source_df 和 end_df 是与上面显示的列名称相同的数据框,前缀为 source / end (内容相同)。我还根据 ID 重新分区,因为我不需要计算具有不同 ID 的行的边。
orig_df = orig_df.repartition(PARALLELISM,'id')
我的问题是有些数据非常有偏见。有些 id 有很多行,有些行很少(例如来自具有 1014 个“名称”的真实数据):
id | row_no |
---|---|
1116 | 803683 |
9151 | 766044 |
12696 | 500045 |
5579 | 318143 |
2756 | 7083 |
2152 | 7075 |
5436 | 7050 |
最终我得到了一些正在工作的执行者,其余的因为他们完成了他们的部分而什么都不做。没有“id”作为键的重新分区无济于事,并行度很高。
序列应该是相似的,所以要么缓存 levenshtein 会有所帮助,但我也觉得 spark 应该将那些更大的 id 计数发送给多个执行程序,并让每个执行程序计算一批。它有这么多空闲的执行者......
如何优化问题?数据似乎分割得很好,但是因为它是一个交叉连接,所以更多的是关于 id 中每个名称的平均序列,然后只是堆叠多个 id,每个 id 都有少量行(因为它是 O(n^2))
我使用的是带有 10 个核心 m5d.x24large 服务器的 EMR 6.3.0
非常感谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)