Python Multiprocessing,函数的一个参数是迭代器,Got TypeError

问题描述

我有这样的代码

    import multiprocessing
    from itertools import product,imap,ifilter

    def test(it):
        for x in it:
            print x     
        return None


    mp_pool = multiprocessing.Pool(multiprocessing.cpu_count())
    it = imap(lambda x: ifilter(lambda y: x+y > 10,xrange(10)),xrange(10))
    result = mp_pool.map(test,it)

我收到错误消息:

     File "/usr/lib64/python2.7/multiprocessing/process.py",line 114,in run
        self._target(*self._args,**self._kwargs)
      File "/usr/lib64/python2.7/multiprocessing/pool.py",line 102,in worker
        task = get()
      File "/usr/lib64/python2.7/multiprocessing/queues.py",line 376,in get
        return recv()
        task = get()
      File "/usr/lib64/python2.7/multiprocessing/queues.py",in get
    TypeError: ifilter expected 2 arguments,got 0
        return recv()

多处理不能使用带有迭代器参数的函数?谢谢!

解决方法

您的迭代器 it 必须生成单个值(每个值可以是“复杂的”,例如元组或列表)。现在我们有:

>>> it
<itertools.imap object at 0x000000000283DB70>
>>> list(it)
[<itertools.ifilter object at 0x000000000283DC50>,<itertools.ifilter object at 0x000000000283DF98>,<itertools.ifilter object at 0x000000000283DBE0>,<itertools.ifilter object at 0x000000000283DF60>,<itertools.ifilter object at 0x000000000283DB00>,<itertools.ifilter object at 0x000000000283DCC0>,<itertools.ifilter object at 0x000000000283DD30>,<itertools.ifilter object at 0x000000000283DDA0>,<itertools.ifilter object at 0x000000000283DE80>,<itertools.ifilter object at 0x000000000284F080>]

it 的每次迭代都会产生另一个迭代器,这就是问题的根源。

所以你必须“迭代你的迭代器”:

import multiprocessing
from itertools import imap,ifilter
import sys


def test(t):
    return 't = ' + str(t) # return value rather than printing


if __name__ == '__main__': # required for Windows
    mp_pool = multiprocessing.Pool(multiprocessing.cpu_count())
    it = imap(lambda x: ifilter(lambda y: x+y > 10,xrange(10)),xrange(10))
    for the_iterator in it:
        result = mp_pool.map(test,the_iterator)
        print result
    mp_pool.close() # needed to ensure all processes terminate
    mp_pool.join() # needed to ensure all processes terminate

打印的结果,如您定义的 it,是:

[]
[]
['t = 9']
['t = 8','t = 9']
['t = 7','t = 8','t = 9']
['t = 6','t = 7','t = 9']
['t = 5','t = 6','t = 9']
['t = 4','t = 5','t = 9']
['t = 3','t = 4','t = 9']
['t = 2','t = 3','t = 9']

但是如果您想充分利用多处理(假设您有足够的处理器),那么您可以使用 map_async 以便可以一次提交所有作业:

import multiprocessing
from itertools import imap,xrange(10))
    results = [mp_pool.map_async(test,the_iterator) for the_iterator in it]
    for result in results:
        print result.get()
    mp_pool.close() # needed to ensure all processes terminate
    mp_pool.join() # needed to ensure all processes terminate

或者您可以考虑使用 my_pool.imap,它与 my_pool.map_async 不同,它不会首先将可迭代参数转换为列表来确定用于提交作业的最佳 chunksize 值(阅读文档,这不是很好),但默认情况下使用 chunksize 值 1,这对于非常大的迭代通常是不可取的:

results = [mp_pool.imap(test,the_iterator) for the_iterator in it]
for result in results:
    print list(result) # to get a comparable printout as when using map_async

更新:使用多处理生成列表

import multiprocessing
from itertools import imap,ifilter
import sys


def test(t):
    return 't = ' + str(t) # return value rather than printing

def generate_lists(x):
    return list(ifilter(lambda y: x+y > 10,xrange(10)))

if __name__ == '__main__': # required for Windows
    mp_pool = multiprocessing.Pool(multiprocessing.cpu_count())
    lists = mp_pool.imap(generate_lists,xrange(10))
    # lists,returned by mp_pool.imap,is an iterable
    # as each element of lists becomes available it is passed to test:
    results = mp_pool.imap(test,lists)
    # as each result becomes available
    for result in results:
        print result
    mp_pool.close() # needed to ensure all processes terminate

打印:

t = []
t = []
t = [9]
t = [8,9]
t = [7,8,9]
t = [6,7,9]
t = [5,6,9]
t = [4,5,9]
t = [3,4,9]
t = [2,3,9]