问题描述
import multiprocessing as mp
import time
import datetime
def yielder(IMapIterator):
while True:
try:
yield IMapIterator.next()
except:
return
def download_one(x):
time.sleep(1)
return x + '_/'
def test_one(x):
time.sleep(0.3)
return x + '\\'
pool_download = mp.Pool(2)
iterator_download = pool_download.imap(download_one,['a','b','c','d'])
generator_download = yielder(iterator_download)
pool_test = mp.Pool(2)
iterator_test = pool_test.imap(test_one,generator_download)
generator_test = yielder(iterator_test)
for i in range(20):
print(str(datetime.datetime.utcNow()),generator_test.next())
time.sleep(0.5)
其中 generator_download
是生成器。
也有以下非工作代码,标题错误,其中 generator_download
也是一个生成器:
import multiprocessing as mp
def yielder(queue):
while True:
element = queue.get()
if element is None:
break
else:
yield element
manager = mp.Manager()
queue_download = manager.Queue()
pool_downloader = manager.Pool(2)
list_urls = ['a','d']
def downloader(url):
time.sleep(np.random.random())
return a + '_'
generator_download = yielder(queue_download)
r = pool_downloader.imap(downloader,generator_download)
有人能解释一下区别吗? 在全球范围内,我希望有 3 个队列和 2 个池,它们始终如一地持续下载和测试一些对象,直到最后一个队列为空。
解决方法
通过替换解决
pool_downloader = manager.Pool(2)
到
pool_downloader = mp.Pool(2)
我不知道我理解的对不对,不过好像经理的副本试图将自己转移到自己身上,导致错误的原因