无法使用 WholeTextFiles 方法使用 Spark 读取位于 Unix 文件系统上的文件

问题描述

我在读取存放在 linux 文件系统上的文件时遇到问题:

/app/dev/spark/test1.json
/app/dev/spark/test2.json

我正在尝试通过声明 wholeTextFiles 使用 SparkContext 方法读取这些文件

object HdfsHelper extends App {

  val path: String = ...

  val ss: SparkSession = SparkSession.builder().getorCreate()
  val sc: SparkContext = ss.sparkContext

  val files = sc.wholeTextFiles(path).collect()

}

我测试了几种有关 path 变量的语法,包括以下内容

file:////app/dev/spark 
file:///app/dev/spark
file://app/dev/spark
file:/app/dev/spark
/app/dev/spark

但它们都不起作用,我的程序总是以错误消息结束:

Caused by: java.io.FileNotFoundException: File file:/app/dev/spark/test1.json does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
        at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
        at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
        at org.apache.spark.input.WholeTextFileRecordReader.nextkeyvalue(WholeTextFileRecordReader.scala:75)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextkeyvalue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

有什么想法吗? (我使用的是 2.4.0 Spark 版本。)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)