使用parallel.futures每秒并行调用fn

问题描述

我一直在努力处理如何使用并发函数每秒钟调用一次函数3次,而不必等待它返回。拨打完所有需要拨打的电话后,我将收集结果。

这是我现在的位置,我很惊讶此示例函数中的sleep()阻止我的代码启动3个函数调用的下一个块。我显然对这里的文档不够了解:)

def print_something(thing):
    print(thing)
    time.sleep(10)

# define a generator 
def chunks(l,n):
    """Yield successive n-sized chunks from l."""    
    for i in range(0,len(l),n):
        yield l[i:i + n]

def main():    
    chunk_number = 0
    alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    for current_chunk in chunks(alphabet,3):  # Restrict to calling the function 3 times per second
        with ProcesspoolExecutor(max_workers=3) as executor:        
            futures = { executor.submit(print_something,thing): thing for thing in current_chunk }
            chunk_number += 1            
            print('chunk %s' % chunk_number)
            time.sleep(1)            
        
    for result in as_completed(futures): 
        print(result.result())

代码导致打印出3个块,每个块之间的睡眠时间为10s。如何更改此设置以确保在调用下一批之前我不等待函数返回?

谢谢

解决方法

首先,对于for current_chunk in chunks(alphabet,3):的每次迭代,您都在创建一个新的ProcessPoolExecutor实例和futures字典实例,从而破坏了前一个实例。因此,最后一个循环for result in as_completed(futures):仅打印最后提交的块中的结果。其次,也是我相信您会绞死的原因,由with ProcessPoolExecutor(max_workers=3) as executor:所管理的块将不会终止,直到executor所提交的任务完成并且将花费至少10秒钟。因此,for current_chunk in chunks(alphabet,3):块的下一次迭代不会每10秒执行一次。

还要注意,出于相同的原因,需要在for result in as_completed(futures):块内移动块with ThreadPoolExecutor(max_workers=26) as executor:。也就是说,如果将其放置在该位置,它将在所有任务完成之前不会执行,因此您将无法在完成时获得结果。

您需要做一些如下所示的重新排列(我还修改了print_something以返回除None之外的其他内容。如果您有足够的工人(26)来挂起,则现在不应挂起运行正在提交的26个任务。我怀疑您的台式机(如果正在PC上运行)具有26个内核,可以同时支持26个执行过程。但是我注意到print_something仅显示一个短字符串,然后休眠10秒钟,这使它可以将其处理器放弃给池中的另一个进程。因此,在执行CPU密集型任务时,如果指定一个大于数字的max_workers值,则几乎无济于事可以使用计算机上的实际物理处理器/核数,在这种情况下就可以了,但是当您花费很少时间执行实际Python字节码的任务时,使用线程代替进程是更有效的方法,因为创建线程的成本很高少于创建流程的成本,但是,当您大量运行的任务时,线程的性能非常差由Python字节代码组成,因为由于全局解释器锁(GIL)的序列化,此类代码无法同时执行。

供您研究的主题:全局解释器锁(GIL)和Python字节代码执行

更新为使用线程:

所以我们应该用26个或更多轻量级线程替换ThreadPoolExecutor来代替ProcessPoolExecutorconcurrent.futures模块的优点是不需要更改其他代码。但最重要的是更改块结构并具有一个executor

from concurrent.futures import ThreadPoolExecutor,as_completed
import time

def print_something(thing):
    # NOT cpu-intensive,so threads should work well here
    print(thing)
    time.sleep(10)
    return thing # so there is a non-None result
    

# define a generator
def chunks(l,n):
    """Yield successive n-sized chunks from l."""
    for i in range(0,len(l),n):
        yield l[i:i + n]

def main():
    chunk_number = 0
    alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    futures = {}
    with ThreadPoolExecutor(max_workers=26) as executor:
        for current_chunk in chunks(alphabet,3):  # Restrict to calling the function 3 times per second
            futures.update({executor.submit(print_something,thing): thing for thing in current_chunk })
            chunk_number += 1
            print('chunk %s' % chunk_number)
            time.sleep(1)

        # needs to be within the executor block else it won't run until all futures are complete    
        for result in as_completed(futures):
            print(result.result())

if __name__ == '__main__':
    main()