Python多重处理-是否可以将itertools.islice可迭代的变量直接传递给pool.imap,而无需转换为列表?

问题描述

从DB2中读取大表(某些表有1亿个)后,我使用itertools.islice将生成器对象转换为迭代器。我将迭代器传递给multiprocessing pool.map,后者调用一个函数将这些块提取为CSV并行格式。

它可以工作,但是在并行运行开始之前,python pool.map将IteraTOR转换为消耗大量时间的LIST。有没有一种方法可以避免创建此列表或更​​快地将其转换为列表?我也尝试使用POOL.IMAP,但是运行程序时笔记本内核死了。要使用IMAP,我将不得不将迭代器转换为再次花费时间的列表。有什么想法吗?

generator_df = pd.read_sql(query2,test_connection_forbankcv_connection,chunksize = 5000)
iterable_slice = list(it.islice(generator_df,slice_start,slice_end))
results = p.imap(chunk_to_csv,iterable_slice,1) 

解决方法

我马上承认,此解决方案有一些问题,但它显示了基本思想:

import itertools
from typing import Iterable
from multiprocessing import Pool

class Lengthed_ISlice:
    def __init__(self,iterable: Iterable,start: int,stop: int):
        self._start = start
        self._stop = stop
        self._islice = itertools.islice(iterable,self._start,self._stop)

    def __len__(self):
        return self._stop - self._start

    def __iter__(self):
        return iter(self._islice)

这是对islice对象的精简包装,该对象实现了必需的__len__方法,以便可以与Pool的{​​{1}}方法一起使用:

map

主要问题:

  • 除了def double(n): return n * 2 my_list = list(range(10,100)) with Pool() as p: print(p.map(double,Lengthed_ISlice(my_list,2,9))) # Prints [24,26,28,30,32,34,36] 之外,它没有正确地将任何功能委派给基础islice。如果您在扩展此方法的用法时遇到关于缺少方法的错误,则需要实现适当的方法。
  • 为简便起见,由于您没有使用非默认步骤,因此我不理会步骤,并且它们使数学复杂化了一点。
  • 我不担心使用__iter__的通用参数。如果想要更好的类型提示,则应为构造函数参数和Iterable引入TypeVar