为什么在创建设备分位数矩阵时出现断言错误?

问题描述

我正在使用以下代码将 csv 文件加载到 dask cudf 中,然后为 xgboost 创建一个 devicequantilematrix,这会产生错误:

cluster = LocalCUDACluster(rmm_pool_size=parse_bytes("9GB"),n_workers=5,threads_per_worker=1)
client = Client(cluster)

ddb = dask_cudf.read_csv('/home/ubuntu/dataset.csv')
xTrain = ddb.iloc[:,20:]
yTrain = ddb.iloc[:,1:2]

dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client,data=xTrain,label=yTrain)

错误:

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-16-2cca13ac807f> in <module>
----> 1 dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client,label=yTrain)

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in __init__(self,client,data,label,missing,weight,base_margin,label_lower_bound,label_upper_bound,feature_names,feature_types,max_bin)
    508                          label_upper_bound=label_upper_bound,509                          feature_names=feature_names,--> 510                          feature_types=feature_types)
    511         self.max_bin = max_bin
    512         self.is_quantile = True

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in __init__(self,feature_types)
    229                                  base_margin=base_margin,230                                  label_lower_bound=label_lower_bound,--> 231                                  label_upper_bound=label_upper_bound)
    232 
    233     def __await__(self):

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in sync(self,func,asynchronous,callback_timeout,*args,**kwargs)
    835         else:
    836             return sync(
--> 837                 self.loop,callback_timeout=callback_timeout,**kwargs
    838             )
    839 

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in sync(loop,**kwargs)
    338     if error[0]:
    339         typ,exc,tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future,callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in map_local_data(self,weights,label_upper_bound)
    311 
    312         for part in parts:
--> 313             assert part.status == 'finished'
    314 
    315         # Preserving the partition order for prediction.

AssertionError: 

我不知道这个错误是由什么引起的,因为它除了“断言错误”之外什么也没说。我有一个太大的数据集,它太大而无法读入单个 GPU,因此当我从磁盘读取它时,我使用 dask_cudf 将其拆分,然后将其直接输入到 XGBoost 所需的数据结构中。我不确定是 dask_cudf 问题还是 XGBoost 问题。

当我在持久化时使用“等待”时出现新错误:

distributed.core - ERROR - 2154341415 exceeds max_bin_len(2147483647)
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",line 563,in handle_stream
    handler(**merge(extra,msg))
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py",line 2382,in update_graph_hlg
    dsk,dependencies,annotations = highlevelgraph_unpack(hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/highlevelgraph.py",line 161,in highlevelgraph_unpack
    hlg = loads_msgpack(*dumped_hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/core.py",line 223,in loads_msgpack
    payload,object_hook=msgpack_decode_default,use_list=False,**msgpack_opts
  File "msgpack/_unpacker.pyx",line 195,in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://127.0.0.1:43507' processes=4 threads=4,memory=49.45 GB>>
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/ioloop.py",line 905,in _run
    return self.callback()
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py",line 1177,in _heartbeat
    self.scheduler_comm.send({"op": "heartbeat-client"})
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/batched.py",line 136,in send
    raise CommClosedError
distributed.comm.core.CommClosedError
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",line 491,in handle_comm
    result = await result
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py",line 3247,in add_client
    await self.handle_stream(comm=comm,extra={"client": client})
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f7058e87f80>,<Task finished coro=<BaseTCPListener._handle_stream() done,defined at /usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/comm/tcp.py:459> exception=ValueError('2154341415 exceeds max_bin_len(2147483647)')>)
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/ioloop.py",line 741,in _run_callback
    ret = callback()
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/tcpserver.py",line 331,in <lambda>
    gen.convert_yielded(future),lambda f: f.result()
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/comm/tcp.py",line 476,in _handle_stream
    await self.comm_handler(comm)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-9-e2b8073da6e7> in <module>
      1 from dask.distributed import wait
----> 2 wait([xTrainDC,yTrainDC])

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in wait(fs,timeout,return_when)
   4257     """
   4258     client = default_client()
-> 4259     result = client.sync(_wait,fs,timeout=timeout,return_when=return_when)
   4260     return result
   4261 

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in sync(self,callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

CancelledError: 

解决方法

我猜这是 dask_cudf.read_csv('/home/ubuntu/dataset.csv') 失败中的某些原因导致潜在的未来状态不是 finished。 CSV 是否适合您使用的 GPU 的 GPU 内存?你可以试试下面的代码并报告错误信息吗?

这将告诉 dask 计算 read_csviloc 函数的结果,并在继续创建 DMatrix 之前等待分发的结果完成。

from dask.distributed import wait

cluster = LocalCUDACluster(rmm_pool_size=parse_bytes("9GB"),n_workers=5,threads_per_worker=1)
client = Client(cluster)

ddb = dask_cudf.read_csv('/home/ubuntu/dataset.csv')
xTrain = ddb.iloc[:,20:].persist()
yTrain = ddb.iloc[:,1:2].persist()
wait([xTrain,yTrain])

dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client,data=xTrain,label=yTrain)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...