问题描述
case class B(val b1:String,val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1,B2))
dsB.show()
+---+---+
| b1| b2|
+---+---+
|One| 1|
|Two| 2|
+---+---+
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget",(b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2",mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
+---+---+
| b1| b2|
+---+---+
|One|Van|
+---+---+
val j = dsB1.joinWith(dsB,dsB1("b1") === dsB("b1"),"inner")
j.show()
+----------+--------+
| _1| _2|
+----------+--------+
|[One,Van]|[One,1]|
|[One,Van]|[Two,2]|
+----------+--------+
joinWith
结果不正确。它本质上是在做一个交叉产品。任何线索是什么问题?我已经验证 join
API 工作正常。
val j = dsB1.join(dsB,"inner")
j.show()
+---+---+---+---+
| b1| b2| b1| b2|
+---+---+---+---+
|One|Van|One| 1|
+---+---+---+---+
解决方法
看起来您使用的是相当旧的 Spark 版本。 在 Spark 2.4.4 上,运行您的示例时出现以下异常:
org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.
原因是连接条件实际上是将 dsB("b1")
与自身进行比较,并且始终为真。
一个简单的解决方案是重命名列。类似的东西:
val dsB1 = dsB.withColumn("b2",mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1","b1_2")
val j = dsB1.joinWith(dsB,dsB1("b1_2") === dsB("b1"),"inner")
j.show
+----------+--------+
| _1| _2|
+----------+--------+
|[One,Van]|[One,1]|
+----------+--------+