问题描述
我一直在努力处理如何使用并发函数每秒钟调用一次函数3次,而不必等待它返回。拨打完所有需要拨打的电话后,我将收集结果。
这是我现在的位置,我很惊讶此示例函数中的sleep()阻止我的代码启动3个函数调用的下一个块。我显然对这里的文档不够了解:)
def print_something(thing):
print(thing)
time.sleep(10)
# define a generator
def chunks(l,n):
"""Yield successive n-sized chunks from l."""
for i in range(0,len(l),n):
yield l[i:i + n]
def main():
chunk_number = 0
alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
for current_chunk in chunks(alphabet,3): # Restrict to calling the function 3 times per second
with ProcesspoolExecutor(max_workers=3) as executor:
futures = { executor.submit(print_something,thing): thing for thing in current_chunk }
chunk_number += 1
print('chunk %s' % chunk_number)
time.sleep(1)
for result in as_completed(futures):
print(result.result())
此代码导致打印出3个块,每个块之间的睡眠时间为10s。如何更改此设置以确保在调用下一批之前我不等待函数返回?
谢谢
解决方法
首先,对于for current_chunk in chunks(alphabet,3):
的每次迭代,您都在创建一个新的ProcessPoolExecutor
实例和futures
字典实例,从而破坏了前一个实例。因此,最后一个循环for result in as_completed(futures):
仅打印最后提交的块中的结果。其次,也是我相信您会绞死的原因,由with ProcessPoolExecutor(max_workers=3) as executor:
所管理的块将不会终止,直到executor
所提交的任务完成并且将花费至少10秒钟。因此,for current_chunk in chunks(alphabet,3):
块的下一次迭代不会每10秒执行一次。
还要注意,出于相同的原因,需要在for result in as_completed(futures):
块内移动块with ThreadPoolExecutor(max_workers=26) as executor:
。也就是说,如果将其放置在该位置,它将在所有任务完成之前不会执行,因此您将无法在完成时获得结果。
您需要做一些如下所示的重新排列(我还修改了print_something
以返回除None
之外的其他内容。如果您有足够的工人(26)来挂起,则现在不应挂起运行正在提交的26个任务。我怀疑您的台式机(如果正在PC上运行)具有26个内核,可以同时支持26个执行过程。但是我注意到print_something
仅显示一个短字符串,然后休眠10秒钟,这使它可以将其处理器放弃给池中的另一个进程。因此,在执行CPU密集型任务时,如果指定一个大于数字的max_workers
值,则几乎无济于事可以使用计算机上的实际物理处理器/核数,在这种情况下就可以了,但是当您花费很少时间执行实际Python字节码的任务时,使用线程代替进程是更有效的方法,因为创建线程的成本很高少于创建流程的成本,但是,当您大量运行的任务时,线程的性能非常差由Python字节代码组成,因为由于全局解释器锁(GIL)的序列化,此类代码无法同时执行。
供您研究的主题:全局解释器锁(GIL)和Python字节代码执行
更新为使用线程:
所以我们应该用26个或更多轻量级线程替换ThreadPoolExecutor
来代替ProcessPoolExecutor
。 concurrent.futures
模块的优点是不需要更改其他代码。但最重要的是更改块结构并具有一个executor
。
from concurrent.futures import ThreadPoolExecutor,as_completed
import time
def print_something(thing):
# NOT cpu-intensive,so threads should work well here
print(thing)
time.sleep(10)
return thing # so there is a non-None result
# define a generator
def chunks(l,n):
"""Yield successive n-sized chunks from l."""
for i in range(0,len(l),n):
yield l[i:i + n]
def main():
chunk_number = 0
alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
futures = {}
with ThreadPoolExecutor(max_workers=26) as executor:
for current_chunk in chunks(alphabet,3): # Restrict to calling the function 3 times per second
futures.update({executor.submit(print_something,thing): thing for thing in current_chunk })
chunk_number += 1
print('chunk %s' % chunk_number)
time.sleep(1)
# needs to be within the executor block else it won't run until all futures are complete
for result in as_completed(futures):
print(result.result())
if __name__ == '__main__':
main()