尝试使用dask延迟来读取和计数csv文件中的行

问题描述

我有5个CSV文件,每个文件包含大约1M记录。我正在使用dask尝试以并行方式读取它们,并对每个记录进行计数,然后求和以获得记录的总数。 这是我的代码

onBlur

但是,尝试运行上述代码时,我收到以下错误消息。

import dask.dataframe as dd
import dask

counts = []

def read_file(fn):
    df = dask.delayed(dd.read_csv)(fn)
    return len(df.index)

for i in range(5):
    filename="c://parallel//test"+str(i)+".csv"
    print(filename)
    counts.append(read_file(filename))
    

dask.compute(sum(counts))

如果我将~\anaconda3\lib\site-packages\dask\delayed.py in __len__(self) 549 def __len__(self): 550 if getattr(self,"_length",None) is None: --> 551 raise TypeError("Delayed objects of unspecified length have no len()") 552 return self._length 553 TypeError: Delayed objects of unspecified length have no len() 行替换为诸如return len(df.index)这样的硬编码值,一切都会按预期进行

谁能告诉我如何解决这个问题。

预先感谢

解决方法

请勿在延迟函数内混合集合(数据框)。 您正在寻找的解决方案应该更简单:

import dask.dataframe as dd

filenames = ["c://parallel//test"+str(i)+".csv" for i in range(5)]
df = dd.read_csv(filenames)
len(df)