为什么 dask.delayed 在使用 networkx 时比串行代码花费的时间更长?

问题描述

我想使用 glPopMatrix 的并行计算来加速函数 my_func() 的执行。

在 3 维循环中,dask.delayedmy_func()(本质上是从循环外的文件加载的 iris.cube.Cube)中提取一个值,并根据该值,使用 dask.array 创建一个随机网络并找到从节点 0 到节点 16 的最短路径。每个数组点的计算是独立的。

  1. 为什么执行并行代码(5.43 秒)比串行代码(2.94 秒)需要更长的时间?
  2. 是否有更好的方法来加快速度?networkxdask 或其他什么?

这是一个可重现的例子:

multiprocessing

序列号:

import random

import dask
import iris
import networkx as nx
from dask import delayed
from dask.distributed import Client
from networkx.generators.random_graphs import gnp_random_graph

# Input
fname = iris.sample_data_path("uk_hires.pp")  # https://anaconda.org/conda-forge/iris-sample-data
temp_ptntl = iris.load_cube(fname,"air_potential_temperature")[-1,...]  # last time step only
# Dimensions
zs = temp_ptntl.coord("model_level_number").points
lats = temp_ptntl.coord("grid_latitude").points
lons = temp_ptntl.coord("grid_longitude").points

def my_func(iz,iy,ix):
    constraint = iris.Constraint(model_level_number=iz,grid_latitude=iy,grid_longitude=ix)
    temp_virt = temp_ptntl.extract(constraint) * (1 + 0.61 * 0.04)
    if float(temp_virt.data) > 295:
        G = nx.gnp_random_graph(30,0.2,seed=random.randint(1,10),directed=True)
        distance,path = nx.single_source_dijkstra(G,source=0,target=16)
    else:
        pass
    return temp_virt,distance,path

使用 %%time results_serial = [] # serial code for iz in zs: for iy in lats[0:5]: for ix in lons[0:2]: results_serial.append(my_func(iz,ix)) >>> cpu times: user 2.94 s,sys: 44 ms,total: 2.99 s >>> Wall time: 2.94 s

dask

解决方法

dask 会有一些开销,所以在小样本上,它表现不佳并不罕见。当我尝试通过更改为 for iy in lats[0:15]: 来增加计算次数时,我看到串行计算需要 10 秒,而 dask 在 4 秒内完成。

(还有函数的序列化,可能需要一些时间,但只适用于第一次将函数发送给worker)