Dask:如何并行化和序列化方法?

问题描述

我正在尝试在 PBS 集群上使用 Dask 并行化类中的方法。

我最大的挑战是这种方法应该并行化一些计算,然后对结果运行进一步的并行计算。当然,这应该分布在集群上,以在其他数据上运行类似的计算......

集群已创建:

cluster = PBSCluster(cores=4,memory=10GB,interface="ib0",queue=queue,processes=1,nanny=False,walltime="02:00:00",shebang="#!/bin/bash",env_extra=env_extra,python=python_bin
                    )
cluster.scale(8)
client = Client(cluster)

我需要分发的类有 2 个单独的步骤,它们必须单独运行,因为 step1 写入一个文件,然后在第二步开始时读取该文件。

我尝试了以下方法,将两个步骤一个接一个地放在一个方法中:

def computations(params):
    my_class(**params).run_step1(run_path)
    my_class(**params).run_step2()

chain = []
for p in params_compute:
    y = dask.delayed(computations)(p)
    chain.append(y)

dask.compute(*chain)

但它不起作用,因为第二步是尝试立即读取文件。 所以我需要想办法在step1之后停止执行。

我试图通过添加一个计算()来强制执行第一步:

def computations(params):
    my_class(**params).run_step1(run_path).compute()
    my_class(**params).run_step2()

但这可能不是一个好主意,因为在运行 dask.compute(*chain) 时,我最终会执行 compute(compute()) .. 这可以解释为什么不执行第二步?

最好的方法是什么?

我应该在第 1 步末尾的某个地方包含一个 persist() 吗?

有关信息,请参阅下面的第 1 步和第 2 步:

def run_step1(self,path_step):          
    preprocess_result = dask.delayed(self.run_preprocess)(path_step)  
    gpu_result = dask.delayed(self.run_gpu)(preprocess_result)
    post_gpu = dask.delayed(self.run_postgpu)(gpu_result) # Write a result file post_gpu.tif
    return post_gpu

def run_step2(self):
    data_file = rio.open(self.outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
    temp_result1 = self.process(data_file ) 
    final_merge = dask.delayed(self.merging)(temp_result1 )       
    write =dask.delayed(self.write_final)(final_merge )    
    return write  

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)