如何在 Spark Scala 中进行类型安全数据集左连接

问题描述

背景

关于 Spark 的数据集 API 如何实际上没有提供完全类型安全的连接,有很多令人担忧的问题,但我很固执,所以我真的想尽可能地接近(不使用一些外部库)。

我能够通过内部连接做到这一点。但是,有时我想做一个左(或右)外连接,但不知道如何使类型签名正确。

问题

具体来说,

如果无论(左)外连接类型如何,此函数都不返回任一侧的选项,如何使用 Dataset.joinWith(rightDS,condition,"left")

它似乎需要在“外部”侧(如果进行左连接,则为右侧)返回一个 Option 或根本不返回不可连接的行(即将成为内部连接而不是真正的外部连接)。不知何故,当我将“外部”字段映射到我的合并表中时,我需要让它们为空,但是如果我将“外部”行作为选项进行模式匹配,它会阻止我。

解决方法

TL;DR

Spark 的数据集 API 为整个不可连接的“外部”记录返回空值。

... 不是字段级别的空值(如 SQL)或您可以简单匹配的记录或字段级别的选项。 :( .... 简而言之,joinWith 函数依赖于类型签名,并随意返回一个无声的 null 而不是规定的 case 类(或类型)。

解决方案

  • 在类型签名和模式匹配中,表现得好像它会返回一个有效的记录(例如,不要在 Option 中包装模式匹配,因为这与它们的类型签名不匹配)。
  • 但是,在实际使用该字段之前,创建一个新的 val,将可能为空的“外部”记录包装在 Option 中。

示例

这应该是有效的 Scala 代码(假设您已经设置了 spark 上下文并定义了数据集和案例类),但我还没有对其进行测试。

val joinedDs = leftDs
  .joinWith(rightDs,leftDs("key_field") === rightDs("key_field"),"left")
  .map { x => {
    val l = x._1 // alias the left side
    val r = Option(x._2) // alias and Optionalize the right side
    ResultCaseClass( // defined elsewhere
      l.key_field,l.non_key_field,// may be an Option,or not - just as it appears in leftDs
      option_field = r.map(_.regular_field),// turns nonOption rightDS to Option
      reoption_field = r.flatMap(_.already_an_option) // flatten rightDs Option
    )
  }}