问题描述
首先,如果我在这里遗漏了这个问题的变体,我深表歉意。我试图阅读有关在 R 中并行化 for 循环的类似问题,但我找不到任何适合我的特定情况的内容。
我正在寻找的概述是并行化从目录中读取 tar.gz 文件的循环,提取所需的 NetCDF 数据,格式化/处理数据,并将其写出到 CSV 文件。我想并行运行它们的原因是每个 tar.gz 文件都相当大(几乎 8GB)。我当前的脚本一个一个地遍历每个 tar.gz。这显然需要一点时间,所以我想通过将每个 tar.gz 文件分发给 cpu 上的单个进程来加快进程。
我知道 doParallel 和 foreach 库似乎允许这种情况发生。不幸的是,我正在努力为我的情况实施逻辑。我将提供我的原始脚本的摘要版本以及我尝试使用 doParallel 的脚本,因为我的脚本有点冗长。请参阅以下内容:
原始脚本
#clear memory
rm(list = ls())
#import needed libraries
library(ncdf4)
library(tidyverse)
library(reshape2)
library(readr)
#set working directory
setwd('path_to_working_directory')
#create path to iterate through in the for loop
folder <- getwd()
#create loop to iterate through folder and process data
for (x in dir(folder)) {
print('Extracting from tar file.')
tar.list <- untar(x,list = TRUE)
untar(x,files = tar.list[c(2,5)],exdir = folder) #extract needed files from tar file
print(paste('Extracted',tar.list[2]))
print(paste('Extracted',tar.list[5]))
unlink(x) #delete tar.gz file
print(paste('Removed',x))
print('Switching to NetCDF Process...')
### NetCDF CODE PORTION HERE ###
#write to CSV and delete extracted files
write.csv(processed_file,file = 'name_of_file.csv',row.names = FALSE)
unlink(tar.list[c(2,5)])
}
print('Complete.')
可以看出非常简单的脚本。由于我一次处理目录中的五个 tar.gz 文件,因此我想利用 cpu 上的各个内核。我的 cpu 有八个内核,但为此我只想使用五个内核——每个文件一个。下面是我尝试并行执行与上述相同任务的代码:
并行脚本
#clear memory
rm(list = ls())
library(doParallel) #this package also contains foreach functionality
#set working directory
setwd('path_to_working_directory')
#create path to iterate through in the for loop
folder <- getwd()
registerDoParallel(cl <- makeCluster(5))
results <- foreach(x = dir(folder),.packages = c('ncdf4','tidyverse','reshape2','readr')) %dopar% {
print('Extracting from tar file.')
#list contents of tar file
tar.list <- untar(x,list = TRUE)
untar(x,files = tar.list[c(2,exdir = folder)
print(paste('Extracted',tar.list[2]))
print(paste('Extracted',tar.list[5]))
unlink(x) #delete tar.gz file
print(paste('Removed',x))
print('Switching to NetCDF Process...')
### NetCDF CODE PORTION HERE ###
#write to CSV and remove extracted files
write.csv(processed_file,file = 'file_name_here.csv',row.names = FALSE)
unlink(tar.list[c(2,5)])
}
stopCluster(cl)
当我运行这个脚本时,我可以在 Windows 任务管理器中看到多个生成的进程;但是,我没有看到任何单个进程的任何磁盘活动。显然,我做错了什么,希望有人指出正确的方向。
感谢您的帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)