如何创建Python的Continuous.futures.ProcessPoolExecutor.submits的连续流?

问题描述

我能够提交concurrent.futures.ProcesspoolExecutor.submits()的批次,其中每个批次可能包含多个submit()。但是,我注意到,如果每批提交都消耗大量RAM,那么RAM使用效率可能会相当低;需要等待一批中的所有期货完成,然后才能提交另一批submit()

在满足某些条件之前,如何创建连续的Python concurrent.futures.ProcesspoolExecutor.submit()流?

测试脚本:

#!/usr/bin/env python3

import numpy as np
from numpy.random import default_rng,SeedSequence
import concurrent.futures as cf
from itertools import count


def dojob( process,iterations,samples,rg ):
    # Do some tasks
    result = []
    for i in range( iterations ):
        a = rg.standard_normal( samples )
        b = rg.integers( -3,3,samples )
        mean = np.mean( a + b )
        result.append( ( i,mean ) )
        return { process : result }

if __name__ == '__main__':

    cpus = 2
    iterations = 10000
    samples = 1000

    # Setup NumPy Random Generator
    ss = SeedSequence( 1234567890 )
    child_seeds = ss.spawn( cpus )
    rg_streams = [ default_rng(s) for s in child_seeds ]

    # Peform concurrent analysis by batches
    counter = count( start=0,step=1 )

    # Serial Run of dojob
    process = next( counter )
    for cpu in range( cpus ):
        process = next( counter )
        rg = rg_streams[ cpu ]
        rdict = dojob( process,rg )
    print( 'rdict',rdict )

    
    # Concurrent Run of dojob
    futures = []
    results = []
    with cf.ProcesspoolExecutor( max_workers=cpus ) as executor:

        while True:
            
            for cpu in range( cpus ):
                process = next( counter )
                rg = rg_streams[ cpu ]
                futures.append( executor.submit( dojob,process,rg ) )
                
            for future in cf.as_completed( futures ):
                # Do some post processing
                r = future.result()
                for k,v in r.items():
                    if len( results ) < 5000:
                        results.append( np.std( v ) )
                        print( k,len(results) )

            if len(results) <= 100: #Put a huge number to simulate continuous streaming 
                futures = []
                child_seeds = child_seeds[0].spawn( cpus )
                rg_streams = [ default_rng(s) for s in child_seeds ]
            else:
                break

    print( '\n*** Concurrent Analyses Ended ***' )

解决方法

@AKX 发布的答案有效。为他点赞。经过测试,我想推荐两个我认为值得考虑和实施的修正案。

修正1:要提前取消python脚本的执行,必须使用Ctrl+C。不幸的是,这样做不会终止正在执行函数 concurrent.futures.ProcessPoolExecutor()dojob() 进程。当完成 dojob() 所用的时间很长时,这个问题变得更加明显;这种情况可以通过使脚本中的样本量变大(例如 samples = 100000)来模拟。执行终端命令 ps -ef | grep python 时可以看到此问题。此外,如果 dojob() 消耗大量 RAM,则在手动终止并发进程(例如 kill -9 [PID])之前,这些并发进程使用的内存不会被释放。为了解决这些问题,需要进行以下修改。

with job_cond:
    job_cond.wait()

应改为:

try:
    with job_cond:
        job_cond.wait()
except KeyboardInterrupt:
    # Cancel running futures
    for future in running_futures:
        _ = future.cancel()
    # Ensure concurrent.futures.executor jobs really do finish.
    _ = cf.wait(running_futures,timeout=None)

因此,当必须使用 Ctrl+C 时,您只需先按一次即可。接下来,给 running_futures 中的期货取消一些时间。这可能需要几秒钟到几秒钟才能完成;这取决于 dojob() 的资源需求。您可以在任务管理器或系统监视器中看到 CPU 活动下降到零或听到 CPU 冷却风扇降低的高转速声音。请注意,使用的 RAM 尚未释放。此后,再次按 Ctrl+C,这应该允许所有并发进程干净退出,从而也释放使用的 RAM。

修正 2: 目前,内部 while 循环规定必须以 CPU“mainThread”允许的速度连续提交作业。实际上,提交比 cpus 池中可用 cpus 多的作业没有任何好处。这样做只会不必要地消耗来自主处理器“MainThread”的 cpu 资源。为了规范连续作业提交,可以使用新的 submit_job threading.Event() 对象。

首先,定义这样一个对象并将其值设置为True

submit_job = threading.Event()
submit_job.set()

接下来,在内部while循环的末尾添加这个条件和.wait()方法:

with cf.ProcessPoolExecutor(cpus) as executor:
    while True:
        while len(running_futures) < max_queue_length:
            fn,args = get_job_fn()
            fut = executor.submit(fn,*args)
            fut.add_done_callback(on_complete)
            running_futures.add(fut)
            if len(running_futures) >= cpus: # Add this line
                submit_job.clear()           # Add this line
            submit_job.wait()                # Add this line

最后将 on_complete(future) 回调改为:

def on_complete(future):
    nonlocal jobs_complete
    if process_result_fn(future.result()):
        all_complete_event.set()
    running_futures.discard(future)
    if len(running_futures) < cpus: # add this conditional setting
        submit_job.set()            # add this conditional setting
    jobs_complete += 1
    with job_cond:
        job_cond.notify_all()
,

要补充我的评论,使用完成回调和threading.Condition这样的事情怎么样?我也随意添加进度指示器。

编辑:我将其重构为一个整洁的函数,您可以通过它传递所需的并发性和队列深度,以及一个生成新作业的函数,以及另一个处理结果并使执行者知道的函数。你是否受够了。

import concurrent.futures as cf
import threading
import time
from itertools import count

import numpy as np
from numpy.random import SeedSequence,default_rng


def dojob(process,iterations,samples,rg):
    # Do some tasks
    result = []
    for i in range(iterations):
        a = rg.standard_normal(samples)
        b = rg.integers(-3,3,samples)
        mean = np.mean(a + b)
        result.append((i,mean))
    return {process: result}


def execute_concurrently(cpus,max_queue_length,get_job_fn,process_result_fn):
    running_futures = set()
    jobs_complete = 0
    job_cond = threading.Condition()
    all_complete_event = threading.Event()

    def on_complete(future):
        nonlocal jobs_complete
        if process_result_fn(future.result()):
            all_complete_event.set()
        running_futures.discard(future)
        jobs_complete += 1
        with job_cond:
            job_cond.notify_all()

    time_since_last_status = 0
    start_time = time.time()
    with cf.ProcessPoolExecutor(cpus) as executor:
        while True:
            while len(running_futures) < max_queue_length:
                fn,args = get_job_fn()
                fut = executor.submit(fn,*args)
                fut.add_done_callback(on_complete)
                running_futures.add(fut)

            with job_cond:
                job_cond.wait()

            if all_complete_event.is_set():
                break

            if time.time() - time_since_last_status > 1.0:
                rps = jobs_complete / (time.time() - start_time)
                print(
                    f"{len(running_futures)} running futures on {cpus} CPUs,"
                    f"{jobs_complete} complete. RPS: {rps:.2f}"
                )
                time_since_last_status = time.time()


def main():
    ss = SeedSequence(1234567890)
    counter = count(start=0,step=1)
    iterations = 10000
    samples = 1000
    results = []

    def get_job():
        seed = ss.spawn(1)[0]
        rg = default_rng(seed)
        process = next(counter)
        return dojob,(process,rg)

    def process_result(result):
        for k,v in result.items():
            results.append(np.std(v))
        if len(results) >= 10000:
            return True  # signal we're complete

    execute_concurrently(
        cpus=16,max_queue_length=20,get_job_fn=get_job,process_result_fn=process_result,)


if __name__ == "__main__":
    main()