将 DistributedCache 文件保存在每个节点的磁盘上,无需工作

问题描述

我通过 EMR 在 Spark 上运行代码,所以有一个驱动程序节点和 X 个工作程序节点。我需要在集群中每个节点的磁盘上放置一个 txt 文件,以便稍后在 Spark 作业需要时打开它。

我的设置目前如下所示:

def distributeData(): Unit = {
  val uri = writeFiletoHdfs("testFile.txt",someContent)
  distributeDataToWorkerNodes(uri)
}

辅助方法

def writeFiletoHdfs(fileName: String,content: Content): URI = {
  val path = new Path(s"path/to/file/$fileName")
  val os: FSDataOutputStream = hdfs.create(path)
  val writer = new OutputStreamWriter(os)
  content.foreach(data => {
    writer.write(data)
  }
  writer.close()
  path.toUri
}

def distributeDataToWorkerNodes(uri: URI): Unit = {
  val job = configureJob(uri)
  runJob(job)
}

def configureJob(uri: URI): Job = {
  val job = Job.getInstance(hdfs.getConf,"distributionJob")
  // job.setMapperClass(classOf[MyMapper]) Don't think I need this?
  job.addCacheFile(uri)
  job
}

def runJob(job: Job): Unit = {
  if (!job.waitForCompletion(true)) throw new RuntimeException("Failed job")
}

几个问题:

  1. 通常使用 distributedCache 以便 map/reduce 任务可以访问这些文件来完成它们的工作。我不需要映射器类做任何事情 - 我只需要这个 Job 逻辑将 txt 文件放在每个工作节点上。我需要设置自定义映射器类吗?

  2. 从阅读文档来看,一旦 mapper/reducer 任务完成文件,作业似乎会清理所有缓存的文件。如果我想将这些文件保留在磁盘上,是否可以只覆盖 cleanup 方法以便映射器不执行任何操作?

  3. 添加job.addCacheFile(uri) 的 uri 是否必须带有 hdfs 前缀? hdfs://path/to/file/testFile.txt。到目前为止,我正在返回 path.toUri,这使它看起来好像只会返回 path/to/file/testFile.txt

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)