问题描述
我通过 EMR 在 Spark 上运行代码,所以有一个驱动程序节点和 X 个工作程序节点。我需要在集群中每个节点的磁盘上放置一个 txt
文件,以便稍后在 Spark 作业需要时打开它。
我的设置目前如下所示:
def distributeData(): Unit = {
val uri = writeFiletoHdfs("testFile.txt",someContent)
distributeDataToWorkerNodes(uri)
}
辅助方法:
def writeFiletoHdfs(fileName: String,content: Content): URI = {
val path = new Path(s"path/to/file/$fileName")
val os: FSDataOutputStream = hdfs.create(path)
val writer = new OutputStreamWriter(os)
content.foreach(data => {
writer.write(data)
}
writer.close()
path.toUri
}
def distributeDataToWorkerNodes(uri: URI): Unit = {
val job = configureJob(uri)
runJob(job)
}
def configureJob(uri: URI): Job = {
val job = Job.getInstance(hdfs.getConf,"distributionJob")
// job.setMapperClass(classOf[MyMapper]) Don't think I need this?
job.addCacheFile(uri)
job
}
def runJob(job: Job): Unit = {
if (!job.waitForCompletion(true)) throw new RuntimeException("Failed job")
}
几个问题:
-
通常使用 distributedCache 以便 map/reduce 任务可以访问这些文件来完成它们的工作。我不需要映射器类做任何事情 - 我只需要这个
Job
逻辑将txt
文件放在每个工作节点上。我需要设置自定义映射器类吗? -
从阅读文档来看,一旦 mapper/reducer 任务完成文件,作业似乎会清理所有缓存的文件。如果我想将这些文件保留在磁盘上,是否可以只覆盖
cleanup
方法以便映射器不执行任何操作? -
添加到
job.addCacheFile(uri)
的 uri 是否必须带有 hdfs 前缀?hdfs://path/to/file/testFile.txt
。到目前为止,我正在返回path.toUri
,这使它看起来好像只会返回path/to/file/testFile.txt
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)