禁用 xarray 在 dask 任务中自动使用 dask

问题描述

背景

我正在使用 dask 来管理数以万计,有时是数十万个作业,每个作业都涉及读取 zarr 数据、以某种方式转换数据和写出输出(每个作业一个输出)。我正在使用带有 dask 网关的 pangeo/daskhub 样式的 jupyterhub kubernetes 集群。

数据无法对齐到一个巨大的 dask-backed xarray DataArray,相反,我们常见的使用模式是简单地使用 dask 分布式的 Client.map函数映射到任务。每个任务都可以在内存中运行。

问题在于,对于包括读取 zarr 数组在内的某些操作,xarray 会自动使用集群调度程序来调度每个 I/O 操作,即使该操作是由远程工作者调用的。这使调度程序必须管理的任务数量成倍增加,有时是一个很大的因素。如果 zarr 数组有很多块,有时 dask 调度程序会尝试分配读取,从而导致集群内出现大量网络流量,当许多任务尝试排队以读取它们的块时,这可能会使进度停止全部同时进行。

这里的正确答案完全有可能是“不要这样做”,例如对于如此庞大而复杂的工作,请使用 Argo 或 Kubeflow 之类的工具。但我想看看是否有人对如何使用 dask 很好地进行工作有想法。

问题

我的问题基本上是,在 dask.distributed 任务中运行时,是否有可能阻止 xarray(或其他支持本机 dask 的库)使用集群的调度程序。

我认为理想的情况可能是这样的:

def mappable_task(args):
    input_fp,output_fp = args

    # goal would be for all of the code within this block to operate as if the
    # cluster scheduler did not exist,to ensure data is read locally and the
    # additional tasks created by reading the zarr array do not bog down the 
    # cluster scheduler.
    with dd.disable_scheduler():
        
        # in our workflow we're reading/writing to google cloud storage using
        # gcsfs.GCSFileSystem.get_mapper
        # https://gcsfs.readthedocs.io/en/latest/api.html
        # Also,we sometimes will only be reading a small portion of the
        # data,or are combining multiple datasets. Just noting that this
        # may involve many scheduled operations per `mappable_task` call
        ds = xr.open_zarr(input_fp).load()
        
        # some long-running operation on the data,which depending on our
        # use case has run from nonlinear transforms to geospatial ops to
        # calling hydrodynamics models
        res = ds * 2

        res.to_zarr(output_fp)

def main():
    JOB_SIZE = 10_000
    jobs = [(f'/infiles/{i}.zarr',f'/outfiles/{i}.zarr') for i in range(JOB_SIZE)]
    client = dd.Client()
    futures = client.map(mappable_task,jobs)
    dd.wait(futures)

我不确定这是否会涉及更改 xarray、zarr 或 dd.get_client() 的行为或其他内容

MRE

可以调整以上内容以获得可测试的示例。目标是看不到除了主映射函数之外的任何任务。我在 jupyterlab ipython notebook 中运行了以下内容,并使用 dask-labextension 观看任务(调度程序仪表板显示相同的结果)

进口

import xarray as xr
import dask.distributed as dd
import numpy as np
import os
import datetime
import shutil

测试文件设置

shutil.rmtree('infiles',ignore_errors=True)
shutil.rmtree('outfiles',ignore_errors=True)

os.makedirs('infiles',exist_ok=True)
os.makedirs('outfiles',exist_ok=True)

# create two Zarr stores,each with 1000 chunks. This isn't an uncommon
# structure,though each chunk would normally have far more data
for i in range(2):
    ds = xr.Dataset(
        {'var1': (('dim1','dim2',),np.random.random(size=(1000,100)))},coords={'dim1': np.arange(1000),'dim2': np.arange(100)},).chunk({'dim1': 1})
    
    ds.to_zarr(f'infiles/data_{i}.zarr')

函数定义

def mappable_task(args):
    input_fp,output_fp = args
        
    # in our workflow we're reading/writing to google cloud storage using
    # gcsfs.GCSFileSystem.get_mapper
    # https://gcsfs.readthedocs.io/en/latest/api.html
    ds = xr.open_zarr(input_fp).load()

    # some long-running operation on the data,which depending on our
    # use case has run from nonlinear transforms to geospatial ops to
    # calling hydrodynamics models
    res = ds * 2

    res.to_zarr(output_fp)

创建一个客户端并查看仪表板

client = dd.Client()
client

映射作业

JOB_SIZE = 2
jobs = [(f'infiles/data_{i}.zarr',f'outfiles/out_{i}.zarr') for i in range(JOB_SIZE)]

futures = client.map(mappable_task,jobs)
dd.wait(futures);

清理(如果再次运行)

shutil.rmtree('outfiles',ignore_errors=True)
os.makedirs('outfiles',exist_ok=True)

# refresh the client (in case of running multiple times)
client.restart()

全面清理

shutil.rmtree('infiles',ignore_errors=True)
client.close();

请注意,尽管只有两个作业,但仍安排了数千个任务。

我正在使用 conda 环境(以及许多其他软件包):

dask                      2.30.0                     py_0    conda-forge
dask-gateway              0.9.0            py38h578d9bd_0    conda-forge
dask-labextension         3.0.0                      py_0    conda-forge
jupyter-server-proxy      1.5.0                      py_0    conda-forge
jupyter_client            6.1.7                      py_0    conda-forge
jupyter_core              4.7.0            py38h578d9bd_0    conda-forge
jupyter_server            1.1.3            py38h578d9bd_0    conda-forge
jupyter_telemetry         0.1.0              pyhd8ed1ab_1    conda-forge
jupyterhub                1.2.2            py38h578d9bd_0    conda-forge
jupyterhub-base           1.2.2            py38h578d9bd_0    conda-forge
jupyterlab                2.2.9                      py_0    conda-forge
jupyterlab_server         1.2.0                      py_0    conda-forge
jupyterlab_widgets        1.0.0              pyhd8ed1ab_1    conda-forge
kubernetes                1.18.8                        0    conda-forge
kubernetes-client         1.18.8               haa36a5b_0    conda-forge
kubernetes-node           1.18.8               haa36a5b_0    conda-forge
kubernetes-server         1.18.8               haa36a5b_0    conda-forge
nb_conda_kernels          2.3.1            py38h578d9bd_0    conda-forge
nbclient                  0.5.1                      py_0    conda-forge
nodejs                    15.2.1               h914e61d_0    conda-forge
notebook                  6.1.6            py38h578d9bd_0    conda-forge
numpy                     1.19.4           py38hf0fd68c_1    conda-forge
pandas                    1.1.5            py38h51da96c_0    conda-forge
python                    3.8.6           h852b56e_0_cpython    conda-forge
python-kubernetes         11.0.0           py38h32f6830_0    conda-forge
xarray                    0.16.2             pyhd8ed1ab_0    conda-forge
zarr                      2.6.1              pyhd8ed1ab_0    conda-forge

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...