我正在尝试使用Spark来处理来自HBase表的数据.
This blog post给出了如何使用NewHadoopAPI从任何Hadoop InputFormat读取数据的示例.
我做了什么
由于我需要多次这样做,所以我试图使用implicits来丰富SparkContext,这样我就可以从HBase中给定的一列列中获得一个RDD.我写了以下帮手:
trait HBaseReadSupport { implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc) implicit def bytes2string(bytes: Array[Byte]) = new String(bytes) } final class HBaseSC(sc: SparkContext) extends Serializable { def extract[A](data: Map[String,List[String]],result: Result,interpret: Array[Byte] => A) = data map { case (cf,columns) => val content = columns map { column => val cell = result.getColumnLatestCell(cf.getBytes,column.getBytes) column -> interpret(CellUtil.cloneValue(cell)) } toMap cf -> content } def makeConf(table: String) = { val conf = HBaseConfiguration.create() conf.setBoolean("hbase.cluster.distributed",true) conf.setInt("hbase.client.scanner.caching",10000) conf.set(TableInputFormat.INPUT_TABLE,table) conf } def hbase[A](table: String,data: Map[String,List[String]]) (interpret: Array[Byte] => A) = sc.newAPIHadoopRDD(makeConf(table),classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]) map { case (key,row) => Bytes.toString(key.get) -> extract(data,row,interpret) } }
它可以像
val rdd = sc.hbase[String](table,Map( "cf" -> List("col1","col2") ))
在这种情况下,我们得到一个RDD(String,Map [String,String]]),其中第一个组件是行键,第二个是一个地图,其关键是列系列,列,其内容是单元格值.
哪里失败
不幸的是,我的工作似乎是对sc的引用,它本身不能通过设计进行序列化.当我经营工作时,我得到什么
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
我可以删除助手类,并在我的工作中使用相同的逻辑内联,一切正常.但是我想得到一些可以重用的东西,而不是一遍又一遍地编写相同的样板.
顺便问一下,这个问题不是隐含的,即使使用sc的功能也表现出同样的问题.
为了比较,以下帮助读取TSV文件(我知道它是坏的,因为它不支持引用等等,没关系)似乎工作正常:
trait TsvReadSupport { implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc) } final class TsvRDD(val sc: SparkContext) extends Serializable { def tsv(path: String,fields: Seq[String],separator: Char = '\t') = sc.textFile(path) map { line => val contents = line.split(separator).toList (fields,contents).zipped.toMap } }
How can I encapsulate the logic to read rows from HBase without unintentionally capturing the SparkContext?