尝试使用Dask + PyArrow引擎从HDFS读取Parquet文件时出错

问题描述

我已经在一个项目中使用dask + Pandas + PyArrow + HDFS + Parquet一段时间,该项目将推文存储在Parquet文件中,然后将它们作为dask / Pandas数据帧加载以进行一些分析。我要描述的问题只是在我将dask版本升级到> = 2.17.0(以前的版本正确执行此问题)时出现的。

使用简化数据框的最小示例:

import pyarrow
import pandas
from distributed import Client
import dask.dataframe as dd

hdfs_legacy_client = pyarrow.hdfs.connect(host="hdfs",port=9000,user="root")
dask_client = Client("dask-scheduler:8786",set_as_default=False)

data = [{'a': 1,'b': 2},{'a': 5,'b': 10,'c': 20}]
df = pandas.DataFrame(data)
df.to_parquet("/testing/test.parquet",engine="pyarrow",filesystem=hdfs_legacy_client)
dask_client.compute(dd.read_parquet("hdfs://root@hdfs:9000/testing/test.parquet"),sync=True)

错误回溯:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-13-f4d3cc946643> in <module>
     10 df = pandas.DataFrame(data)
     11 df.to_parquet("/testing/test.parquet",filesystem=hdfs_legacy_client)
---> 12 dask_client.compute(dd.read_parquet("hdfs://root@hdfs:9000/testing/test.parquet"),sync=True)

/usr/local/lib/python3.7/site-packages/distributed/client.py in compute(self,collections,sync,optimize_graph,workers,allow_other_workers,resources,retries,priority,fifo_timeout,actors,traverse,**kwargs)
   2917 
   2918         if sync:
-> 2919             result = self.gather(futures)
   2920         else:
   2921             result = futures

/usr/local/lib/python3.7/site-packages/distributed/client.py in gather(self,futures,errors,direct,asynchronous)
   1991                 direct=direct,1992                 local_worker=local_worker,-> 1993                 asynchronous=asynchronous,1994             )
   1995 

/usr/local/lib/python3.7/site-packages/distributed/client.py in sync(self,func,asynchronous,callback_timeout,*args,**kwargs)
    832         else:
    833             return sync(
--> 834                 self.loop,callback_timeout=callback_timeout,**kwargs
    835             )
    836 

/usr/local/lib/python3.7/site-packages/distributed/utils.py in sync(loop,**kwargs)
    337     if error[0]:
    338         typ,exc,tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/usr/local/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future,callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/usr/local/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/usr/local/lib/python3.7/site-packages/distributed/client.py in _gather(self,local_worker)
   1850                             exc = CancelledError(key)
   1851                         else:
-> 1852                             raise exception.with_traceback(traceback)
   1853                         raise exc
   1854                     if errors == "skip":

/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py in loads()
     73             return pickle.loads(x,buffers=buffers)
     74         else:
---> 75             return pickle.loads(x)
     76     except Exception as e:
     77         logger.info("Failed to deserialize %s",x[:10000],exc_info=True)

/usr/local/lib/python3.7/site-packages/twisted/persisted/styles.py in unpickleMethod()
     97         return getattr(im_class,im_name)
     98     try:
---> 99         methodFunction = _methodFunction(im_class,im_name)
    100     except AttributeError:
    101         log.msg("Method",im_name,"not on class",im_class)

/usr/local/lib/python3.7/site-packages/twisted/persisted/styles.py in _methodFunction()
     74     @rtype: L{types.FunctionType}
     75     """
---> 76     methodobject = getattr(classObject,methodName)
     77     if _PY3:
     78         return methodobject

AttributeError: type object 'type' has no attribute 'read_partition'

其他可能相关的信息:

  1. 项目架构由HDFS,dask调度程序和3个dask工作程序组成,每个工作人员都在一个Docker容器中
  2. 软件包版本为pyarrow == 1.0.1,dask == 2.27.0(尽管高于2.16.0会导致相同的错误),分布式版本== 2.27.0,pandas == 1.1.1
  3. 在使用PyArrow read_table,Pandas read_parquet,使用本地群集的dask或使用此分布式群集的2.16.0版的dask完成后,读取完全相同的文件即可正常工作

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)