问题描述
我得到两个RDD
,并且想要合并并合并为一个RDD
,如下所示:
rdd_1 = ['a1','a2','a3','a4','a5',]
rdd_2 = ['b1','b2','b3','b4','b5',]
# concat and combine these two rdd into one
rdd = ['a1_b1','a2_b2','a3_b3','a4_b4','a5_b5']
我知道我可以将这两个RDD
转换为DataFrame
并将其合并到spark.sql
中,如下所示:
df = df.withColumn('col1_col2',concat(col('col1'),lit(' '),col('col2')))
但是对于Billons级样本来说效率不够高。
因此,我想知道RRD
编程中是否有更快的方法。
解决方法
我认为希望拉链并加入:
rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()
或者没有lambda
:
rdd_1.zip(rdd_2).map('_'.join).collect()
示例:
rdd_1 = spark.sparkContext.parallelize(['a1','a2','a3','a4','a5',])
rdd_2 = spark.sparkContext.parallelize(['b1','b2','b3','b4','b5',])
rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()
['a1_b1','a2_b2','a3_b3','a4_b4','a5_b5']
,
从列表中创建rdds,然后对两个rdds进行压缩,然后使用map和join对其进行迭代和合并。
rd1 = sc.parallelize(['a1',])
rd2 = sc.parallelize(['b1',])
rd1.zip(rd2).map(lambda x: x[0]+'_'+x[1]).collect()
rd1.zip(rd2).map(lambda x: '_'.join(x)).collect()
rd1.zip(rd2).map('_'.join).collect()
['a1_b1','a5_b5']