scala – 丰富SparkContext,而不会导致序列化问题

我正在尝试使用Spark来处理来自HBase表的数据. This blog post给出了如何使用NewHadoopAPI从任何Hadoop InputFormat读取数据的示例.

我做了什么

由于我需要多次这样做,所以我试图使用implicits来丰富SparkContext,这样我就可以从HBase中给定的一列列中获得一个RDD.我写了以下帮手:

trait HBaseReadSupport {
  implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

  implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
  def extract[A](data: Map[String,List[String]],result: Result,interpret: Array[Byte] => A) =
    data map { case (cf,columns) =>
      val content = columns map { column =>
        val cell = result.getColumnLatestCell(cf.getBytes,column.getBytes)

        column -> interpret(CellUtil.cloneValue(cell))
      } toMap

      cf -> content
    }

  def makeConf(table: String) = {
    val conf = HBaseConfiguration.create()

    conf.setBoolean("hbase.cluster.distributed",true)
    conf.setInt("hbase.client.scanner.caching",10000)
    conf.set(TableInputFormat.INPUT_TABLE,table)

    conf
  }

  def hbase[A](table: String,data: Map[String,List[String]])
    (interpret: Array[Byte] => A) =

    sc.newAPIHadoopRDD(makeConf(table),classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]) map { case (key,row) =>
        Bytes.toString(key.get) -> extract(data,row,interpret)
      }

}

它可以像

val rdd = sc.hbase[String](table,Map(
  "cf" -> List("col1","col2")
))

在这种情况下,我们得到一个RDD(String,Map [String,String]]),其中第一个组件是行键,第二个是一个地图,其关键是列系列,列,其内容是单元格值.

哪里失败

不幸的是,我的工作似乎是对sc的引用,它本身不能通过设计进行序列化.当我经营工作时,我得到什么

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

我可以删除助手类,并在我的工作中使用相同的逻辑内联,一切正常.但是我想得到一些可以重用的东西,而不是一遍又一遍地编写相同的样板.

顺便问一下,这个问题不是隐含的,即使使用sc的功能也表现出同样的问题.

为了比较,以下帮助读取TSV文件(我知道它是坏的,因为它不支持引用等等,没关系)似乎工作正常:

trait TsvReadSupport {
  implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
  def tsv(path: String,fields: Seq[String],separator: Char = '\t') = sc.textFile(path) map { line =>
    val contents = line.split(separator).toList

    (fields,contents).zipped.toMap
  }
}

How can I encapsulate the logic to read rows from HBase without unintentionally capturing the SparkContext?

解决方法

只需添加@transient注释到sc变量:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
  ...
}

并确保sc不在提取功能中使用,因为它不会在工作人员上使用.

如果有必要从分布式计算中访问Spark上下文,则可以使用rdd.context函数

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
  case (k,v) => 
    val ctx = rdd.context
    ....
}

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...