问题描述
我正在使用带有两个工作程序和一个调度程序的香草dask-Kubernetes设置来迭代某些JSON文件的行(并应用为简化起见,此处未出现的一些功能)。我看到只有一名工人在工作,我希望在那里能看到两名工人。
希望重新分区有助于我为bag.repartition(num)
试验了不同的值,这些值返回不同的行数,但它们并没有改变有关工作者不平衡和仅集中于一个工作者的内存消耗的任何信息。 >
我认为我不了解分区和工作程序之间的关联,并且在dask文档中找不到任何有关它的信息。任何帮助或指针都非常欢迎!
import dask.bag as db
def grep_buildings():
base = "https://usbuildingdata.blob.core.windows.net/usbuildings-v1-1/"
b = db.text.read_text(f"{base}/Alabama.zip")
# b = b.repartition(2)
lines = b.take(3_000_000)
return lines
len(grep_buildings())
解决方法
在您的示例中,您正在打开文件,并且已压缩
db.text.read_text(f"{base}/Alabama.zip")
Dask能够并行打开和处理多个文件,每个文件至少有一个分区。 Dask还能够将单个文件分成多个块(blocksize
参数);但这仅适用于未压缩的数据。原因是,对于整个文件压缩方法,到达未压缩流中某一点的唯一方法是从头开始读取,因此每个分区都将读取大部分数据。
最后,当从单个分区开始时,重新分区对您没有帮助:您需要先读取整个文件,然后再将数据拆分为下游任务。