Python多重处理-是否可以将itertools.islice可迭代的变量直接传递给pool.imap,而无需转换为列表?

问题描述

从DB2中读取大表(某些表有1亿个)后,我使用itertools.islice将生成器对象转换为迭代器。我将迭代器传递给multiprocessing pool.map,后者调用一个函数将这些块提取为CSV并行格式。

它可以工作,但是在并行运行开始之前,python pool.map将ITERATOR转换为消耗大量时间的LIST。有没有一种方法可以避免创建此列表或更​​快地将其转换为列表?我也尝试使用POOL.IMAP,但是运行程序时笔记本内核死了。要使用IMAP,我将不得不将迭代器转换为再次花费时间的列表。有什么想法吗?

generator_df = pd.read_sql(query2,test_connection_forbankcv_connection,chunksize = 5000)
iterable_slice = list(it.islice(generator_df,slice_start,slice_end))
results = p.imap(chunk_to_csv,iterable_slice,1) 

解决方法

我马上承认,此解决方案有一些问题,但它显示了基本思想:

import itertools
from typing import Iterable
from multiprocessing import Pool

class Lengthed_ISlice:
    def __init__(self,iterable: Iterable,start: int,stop: int):
        self._start = start
        self._stop = stop
        self._islice = itertools.islice(iterable,self._start,self._stop)

    def __len__(self):
        return self._stop - self._start

    def __iter__(self):
        return iter(self._islice)

这是对islice对象的精简包装,该对象实现了必需的__len__方法,以便可以与Pool的{​​{1}}方法一起使用:

map

主要问题:

  • 除了def double(n): return n * 2 my_list = list(range(10,100)) with Pool() as p: print(p.map(double,Lengthed_ISlice(my_list,2,9))) # Prints [24,26,28,30,32,34,36] 之外,它没有正确地将任何功能委派给基础islice。如果您在扩展此方法的用法时遇到关于缺少方法的错误,则需要实现适当的方法。
  • 为简便起见,由于您没有使用非默认步骤,因此我不理会步骤,并且它们使数学复杂化了一点。
  • 我不担心使用__iter__的通用参数。如果想要更好的类型提示,则应为构造函数参数和Iterable引入TypeVar

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...