问题描述
我正在使用以下代码将 csv 文件加载到 dask cudf 中,然后为 xgboost 创建一个 devicequantilematrix,这会产生错误:
cluster = LocalCUDACluster(rmm_pool_size=parse_bytes("9GB"),n_workers=5,threads_per_worker=1)
client = Client(cluster)
ddb = dask_cudf.read_csv('/home/ubuntu/dataset.csv')
xTrain = ddb.iloc[:,20:]
yTrain = ddb.iloc[:,1:2]
dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client,data=xTrain,label=yTrain)
错误:
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
<ipython-input-16-2cca13ac807f> in <module>
----> 1 dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client,label=yTrain)
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in __init__(self,client,data,label,missing,weight,base_margin,label_lower_bound,label_upper_bound,feature_names,feature_types,max_bin)
508 label_upper_bound=label_upper_bound,509 feature_names=feature_names,--> 510 feature_types=feature_types)
511 self.max_bin = max_bin
512 self.is_quantile = True
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in __init__(self,feature_types)
229 base_margin=base_margin,230 label_lower_bound=label_lower_bound,--> 231 label_upper_bound=label_upper_bound)
232
233 def __await__(self):
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in sync(self,func,asynchronous,callback_timeout,*args,**kwargs)
835 else:
836 return sync(
--> 837 self.loop,callback_timeout=callback_timeout,**kwargs
838 )
839
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in sync(loop,**kwargs)
338 if error[0]:
339 typ,exc,tb = error[0]
--> 340 raise exc.with_traceback(tb)
341 else:
342 return result[0]
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in f()
322 if callback_timeout is not None:
323 future = asyncio.wait_for(future,callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:
326 error[0] = sys.exc_info()
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in map_local_data(self,weights,label_upper_bound)
311
312 for part in parts:
--> 313 assert part.status == 'finished'
314
315 # Preserving the partition order for prediction.
AssertionError:
我不知道这个错误是由什么引起的,因为它除了“断言错误”之外什么也没说。我有一个太大的数据集,它太大而无法读入单个 GPU,因此当我从磁盘读取它时,我使用 dask_cudf 将其拆分,然后将其直接输入到 XGBoost 所需的数据结构中。我不确定是 dask_cudf 问题还是 XGBoost 问题。
当我在持久化时使用“等待”时出现新错误:
distributed.core - ERROR - 2154341415 exceeds max_bin_len(2147483647)
Traceback (most recent call last):
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",line 563,in handle_stream
handler(**merge(extra,msg))
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py",line 2382,in update_graph_hlg
dsk,dependencies,annotations = highlevelgraph_unpack(hlg)
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/highlevelgraph.py",line 161,in highlevelgraph_unpack
hlg = loads_msgpack(*dumped_hlg)
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/core.py",line 223,in loads_msgpack
payload,object_hook=msgpack_decode_default,use_list=False,**msgpack_opts
File "msgpack/_unpacker.pyx",line 195,in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://127.0.0.1:43507' processes=4 threads=4,memory=49.45 GB>>
Traceback (most recent call last):
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/ioloop.py",line 905,in _run
return self.callback()
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py",line 1177,in _heartbeat
self.scheduler_comm.send({"op": "heartbeat-client"})
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/batched.py",line 136,in send
raise CommClosedError
distributed.comm.core.CommClosedError
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",line 491,in handle_comm
result = await result
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py",line 3247,in add_client
await self.handle_stream(comm=comm,extra={"client": client})
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f7058e87f80>,<Task finished coro=<BaseTCPListener._handle_stream() done,defined at /usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/comm/tcp.py:459> exception=ValueError('2154341415 exceeds max_bin_len(2147483647)')>)
Traceback (most recent call last):
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/ioloop.py",line 741,in _run_callback
ret = callback()
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/tcpserver.py",line 331,in <lambda>
gen.convert_yielded(future),lambda f: f.result()
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/comm/tcp.py",line 476,in _handle_stream
await self.comm_handler(comm)
File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py",in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
<ipython-input-9-e2b8073da6e7> in <module>
1 from dask.distributed import wait
----> 2 wait([xTrainDC,yTrainDC])
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in wait(fs,timeout,return_when)
4257 """
4258 client = default_client()
-> 4259 result = client.sync(_wait,fs,timeout=timeout,return_when=return_when)
4260 return result
4261
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in sync(self,callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:
326 error[0] = sys.exc_info()
/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
CancelledError:
解决方法
我猜这是 dask_cudf.read_csv('/home/ubuntu/dataset.csv')
失败中的某些原因导致潜在的未来状态不是 finished
。 CSV 是否适合您使用的 GPU 的 GPU 内存?你可以试试下面的代码并报告错误信息吗?
这将告诉 dask 计算 read_csv
和 iloc
函数的结果,并在继续创建 DMatrix
之前等待分发的结果完成。
from dask.distributed import wait
cluster = LocalCUDACluster(rmm_pool_size=parse_bytes("9GB"),n_workers=5,threads_per_worker=1)
client = Client(cluster)
ddb = dask_cudf.read_csv('/home/ubuntu/dataset.csv')
xTrain = ddb.iloc[:,20:].persist()
yTrain = ddb.iloc[:,1:2].persist()
wait([xTrain,yTrain])
dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client,data=xTrain,label=yTrain)