如何在PySpark中合并并将两个rdd合并为一个

问题描述

我得到两个RDD,并且想要合并并合并为一个RDD,如下所示:

rdd_1 = ['a1','a2','a3','a4','a5',]
rdd_2 = ['b1','b2','b3','b4','b5',]

# concat and combine these two rdd into one
rdd = ['a1_b1','a2_b2','a3_b3','a4_b4','a5_b5']

我知道我可以将这两个RDD转换为DataFrame并将其合并到spark.sql中,如下所示:

df = df.withColumn('col1_col2',concat(col('col1'),lit(' '),col('col2')))

但是对于Billons级样本来说效率不够高。
因此,我想知道RRD编程中是否有更快的方法

解决方法

我认为希望拉链并加入:

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

或者没有lambda

rdd_1.zip(rdd_2).map('_'.join).collect()

示例:

rdd_1 = spark.sparkContext.parallelize(['a1','a2','a3','a4','a5',])
rdd_2 = spark.sparkContext.parallelize(['b1','b2','b3','b4','b5',])

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

['a1_b1','a2_b2','a3_b3','a4_b4','a5_b5']
,

从列表中创建rdds,然后对两个rdds进行压缩,然后使用map和join对其进行迭代和合并。

rd1 = sc.parallelize(['a1',])
rd2 = sc.parallelize(['b1',])
rd1.zip(rd2).map(lambda x: x[0]+'_'+x[1]).collect()
rd1.zip(rd2).map(lambda x: '_'.join(x)).collect()
rd1.zip(rd2).map('_'.join).collect()

['a1_b1','a5_b5']