问题描述
我已经在一个项目中使用dask + Pandas + PyArrow + HDFS + Parquet一段时间,该项目将推文存储在Parquet文件中,然后将它们作为dask / Pandas数据帧加载以进行一些分析。我要描述的问题只是在我将dask版本升级到> = 2.17.0(以前的版本正确执行此问题)时出现的。
使用简化数据框的最小示例:
import pyarrow
import pandas
from distributed import Client
import dask.dataframe as dd
hdfs_legacy_client = pyarrow.hdfs.connect(host="hdfs",port=9000,user="root")
dask_client = Client("dask-scheduler:8786",set_as_default=False)
data = [{'a': 1,'b': 2},{'a': 5,'b': 10,'c': 20}]
df = pandas.DataFrame(data)
df.to_parquet("/testing/test.parquet",engine="pyarrow",filesystem=hdfs_legacy_client)
dask_client.compute(dd.read_parquet("hdfs://root@hdfs:9000/testing/test.parquet"),sync=True)
错误回溯:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-13-f4d3cc946643> in <module>
10 df = pandas.DataFrame(data)
11 df.to_parquet("/testing/test.parquet",filesystem=hdfs_legacy_client)
---> 12 dask_client.compute(dd.read_parquet("hdfs://root@hdfs:9000/testing/test.parquet"),sync=True)
/usr/local/lib/python3.7/site-packages/distributed/client.py in compute(self,collections,sync,optimize_graph,workers,allow_other_workers,resources,retries,priority,fifo_timeout,actors,traverse,**kwargs)
2917
2918 if sync:
-> 2919 result = self.gather(futures)
2920 else:
2921 result = futures
/usr/local/lib/python3.7/site-packages/distributed/client.py in gather(self,futures,errors,direct,asynchronous)
1991 direct=direct,1992 local_worker=local_worker,-> 1993 asynchronous=asynchronous,1994 )
1995
/usr/local/lib/python3.7/site-packages/distributed/client.py in sync(self,func,asynchronous,callback_timeout,*args,**kwargs)
832 else:
833 return sync(
--> 834 self.loop,callback_timeout=callback_timeout,**kwargs
835 )
836
/usr/local/lib/python3.7/site-packages/distributed/utils.py in sync(loop,**kwargs)
337 if error[0]:
338 typ,exc,tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
/usr/local/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future,callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
/usr/local/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/usr/local/lib/python3.7/site-packages/distributed/client.py in _gather(self,local_worker)
1850 exc = CancelledError(key)
1851 else:
-> 1852 raise exception.with_traceback(traceback)
1853 raise exc
1854 if errors == "skip":
/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py in loads()
73 return pickle.loads(x,buffers=buffers)
74 else:
---> 75 return pickle.loads(x)
76 except Exception as e:
77 logger.info("Failed to deserialize %s",x[:10000],exc_info=True)
/usr/local/lib/python3.7/site-packages/twisted/persisted/styles.py in unpickleMethod()
97 return getattr(im_class,im_name)
98 try:
---> 99 methodFunction = _methodFunction(im_class,im_name)
100 except AttributeError:
101 log.msg("Method",im_name,"not on class",im_class)
/usr/local/lib/python3.7/site-packages/twisted/persisted/styles.py in _methodFunction()
74 @rtype: L{types.FunctionType}
75 """
---> 76 methodobject = getattr(classObject,methodName)
77 if _PY3:
78 return methodobject
AttributeError: type object 'type' has no attribute 'read_partition'
其他可能相关的信息:
- 项目架构由HDFS,dask调度程序和3个dask工作程序组成,每个工作人员都在一个Docker容器中
- 软件包版本为pyarrow == 1.0.1,dask == 2.27.0(尽管高于2.16.0会导致相同的错误),分布式版本== 2.27.0,pandas == 1.1.1
- 在使用PyArrow read_table,Pandas read_parquet,使用本地群集的dask或使用此分布式群集的2.16.0版的dask完成后,读取完全相同的文件即可正常工作
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)