问题描述
我想输入很多文件并为每一列构造一个pair(Array[String],Index),索引可以是“file-i”,其中i是本地列计数器。 >
例如:
tableA.txt:00 01 02\n10 11 12
tableB.txt:03 04\n13 14
目标(每列及其文件名和索引):
RDD[Array[String],String] : (Array("00","10"),"tableA.txt-0"),(Array("01","11","tableA.txt-1"),(Array("02","12"),"tableA.txt-2"),(Array("03","13"),"tableB.txt-0"),(Array("04","14"),"tableB.txt-1")
我的代码:
val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]
val text = sc.newAPIHadoopFile(path,fc,kc,vc,sc.hadoopConfiguration)
val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable,Text]]
.mapPartitionsWithInputSplit((inputSplit,iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tup => (file.getPath,tup._2))
})
val columnsData = linesWithFileNames.flatMap(p => {
val filename = p._1.toString
val lines = p._2.toString.split("\n")
lines.map(l => l.split(" "))
.toSeq.transpose.zipwithIndex
.map(pair => (pair._1,filename+"-"+pair._2.toString))
})
我的错误结果:
("00",("10","tableA.txt-0")...
解决方法
实现您想要的一种简单方法是使用 wholeTextFiles
,它生成一个 RDD,将每个文件路径与其内容相关联。
代码如下所示:
val result : RDD[(Array[String],String)] = sc
.wholeTextFiles("data1")
.flatMap{ case (path,lines) => lines
.split("\\n")
.zipWithIndex
.map{ case (line,i) => (line.split("\\s+"),path.split("/").last + "-" + i)}
}