问题描述
假设我有一个 List[String] 并且我想将它与一个 RDD 对象合并,以便 RDD 中的每个对象都将列表中的每个值添加到其中:
List[String] myBands = ["Band1","Band2"];
表:乐队成员 |姓名 |仪器 | | ----- | ---------- | |斜线 |吉他| |轴 |人声|
case class BandMembers ( name:String,instrument:String );
var myRDD = BandMembersTable.map(a => new BandMembers(a.name,a.instrument));
//join the myRDD to myBands
// how do I do this?
//var result = myRdd.join/merge/union(myBands);
想要的结果: |姓名 |仪器 |乐队 | | ----- | ---------- |------| |斜线 |吉他|波段1| |斜线 |吉他|波段2| |轴 |人声|波段1| |轴 |人声|波段2|
我不太确定如何以最好的方式处理 Spark/Scala。我知道我可以转换为 DF,然后使用 spark sql 进行连接,但是 RDD 和 List 必须有更好的方法,或者我认为。
解决方法
这里的风格有点偏离,但假设你真的需要 RDD 而不是 Dataset
对于 RDD:
case class BandMembers ( name:String,instrument:String )
val myRDD = spark.sparkContext.parallelize(BandMembersTable.map(a => new BandMembers(a.name,a.instrument)))
val myBands = spark.sparkContext.parallelize(Seq("Band1","Band2"))
val res = myRDD.cartesian(myBands).map { case (a,b) => Row(a.name,a.instrument,b) }
使用数据集:
case class BandMembers ( name:String,instrument:String )
val myRDD = BandMembersTable.map(a => new BandMembers(a.name,a.instrument)).toDS
val myBands = Seq("Band1","Band2").toDS
val res = myRDD.crossJoin(myBands)
输入数据:
val BandMembersTable = Seq(BandMembers("a","b"),BandMembers("c","d"))
val myBands = Seq("Band1","Band2")
输出数据集:
+----+----------+-----+
|name|instrument|value|
+----+----------+-----+
|a |b |Band1|
|a |b |Band2|
|c |d |Band1|
|c |d |Band2|
+----+----------+-----+
带有 RDD 的 Println(这些是行)
[a,b,Band1]
[c,d,Band2]
[c,Band1]
[a,Band2]
,
考虑为此使用 RDD zip .. 来自官方文档
RDD