问题描述
我能够提交concurrent.futures.ProcesspoolExecutor.submits()
的批次,其中每个批次可能包含多个submit()
。但是,我注意到,如果每批提交都消耗大量RAM,那么RAM使用效率可能会相当低;需要等待一批中的所有期货完成,然后才能提交另一批submit()
。
在满足某些条件之前,如何创建连续的Python concurrent.futures.ProcesspoolExecutor.submit()
流?
测试脚本:
#!/usr/bin/env python3
import numpy as np
from numpy.random import default_rng,SeedSequence
import concurrent.futures as cf
from itertools import count
def dojob( process,iterations,samples,rg ):
# Do some tasks
result = []
for i in range( iterations ):
a = rg.standard_normal( samples )
b = rg.integers( -3,3,samples )
mean = np.mean( a + b )
result.append( ( i,mean ) )
return { process : result }
if __name__ == '__main__':
cpus = 2
iterations = 10000
samples = 1000
# Setup NumPy Random Generator
ss = SeedSequence( 1234567890 )
child_seeds = ss.spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
# Peform concurrent analysis by batches
counter = count( start=0,step=1 )
# Serial Run of dojob
process = next( counter )
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
rdict = dojob( process,rg )
print( 'rdict',rdict )
# Concurrent Run of dojob
futures = []
results = []
with cf.ProcesspoolExecutor( max_workers=cpus ) as executor:
while True:
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
futures.append( executor.submit( dojob,process,rg ) )
for future in cf.as_completed( futures ):
# Do some post processing
r = future.result()
for k,v in r.items():
if len( results ) < 5000:
results.append( np.std( v ) )
print( k,len(results) )
if len(results) <= 100: #Put a huge number to simulate continuous streaming
futures = []
child_seeds = child_seeds[0].spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
else:
break
print( '\n*** Concurrent Analyses Ended ***' )
解决方法
@AKX 发布的答案有效。为他点赞。经过测试,我想推荐两个我认为值得考虑和实施的修正案。
修正1:要提前取消python脚本的执行,必须使用Ctrl+C。不幸的是,这样做不会终止正在执行函数 concurrent.futures.ProcessPoolExecutor()
的 dojob()
进程。当完成 dojob()
所用的时间很长时,这个问题变得更加明显;这种情况可以通过使脚本中的样本量变大(例如 samples = 100000
)来模拟。执行终端命令 ps -ef | grep python
时可以看到此问题。此外,如果 dojob()
消耗大量 RAM,则在手动终止并发进程(例如 kill -9 [PID]
)之前,这些并发进程使用的内存不会被释放。为了解决这些问题,需要进行以下修改。
with job_cond:
job_cond.wait()
应改为:
try:
with job_cond:
job_cond.wait()
except KeyboardInterrupt:
# Cancel running futures
for future in running_futures:
_ = future.cancel()
# Ensure concurrent.futures.executor jobs really do finish.
_ = cf.wait(running_futures,timeout=None)
因此,当必须使用 Ctrl+C 时,您只需先按一次即可。接下来,给 running_futures
中的期货取消一些时间。这可能需要几秒钟到几秒钟才能完成;这取决于 dojob()
的资源需求。您可以在任务管理器或系统监视器中看到 CPU 活动下降到零或听到 CPU 冷却风扇降低的高转速声音。请注意,使用的 RAM 尚未释放。此后,再次按 Ctrl+C,这应该允许所有并发进程干净退出,从而也释放使用的 RAM。
修正 2: 目前,内部 while 循环规定必须以 CPU“mainThread”允许的速度连续提交作业。实际上,提交比 cpus 池中可用 cpus 多的作业没有任何好处。这样做只会不必要地消耗来自主处理器“MainThread”的 cpu 资源。为了规范连续作业提交,可以使用新的 submit_job
threading.Event()
对象。
首先,定义这样一个对象并将其值设置为True
:
submit_job = threading.Event()
submit_job.set()
接下来,在内部while循环的末尾添加这个条件和.wait()
方法:
with cf.ProcessPoolExecutor(cpus) as executor:
while True:
while len(running_futures) < max_queue_length:
fn,args = get_job_fn()
fut = executor.submit(fn,*args)
fut.add_done_callback(on_complete)
running_futures.add(fut)
if len(running_futures) >= cpus: # Add this line
submit_job.clear() # Add this line
submit_job.wait() # Add this line
最后将 on_complete(future)
回调改为:
def on_complete(future):
nonlocal jobs_complete
if process_result_fn(future.result()):
all_complete_event.set()
running_futures.discard(future)
if len(running_futures) < cpus: # add this conditional setting
submit_job.set() # add this conditional setting
jobs_complete += 1
with job_cond:
job_cond.notify_all()
,
要补充我的评论,使用完成回调和threading.Condition
这样的事情怎么样?我也随意添加进度指示器。
编辑:我将其重构为一个整洁的函数,您可以通过它传递所需的并发性和队列深度,以及一个生成新作业的函数,以及另一个处理结果并使执行者知道的函数。你是否受够了。
import concurrent.futures as cf
import threading
import time
from itertools import count
import numpy as np
from numpy.random import SeedSequence,default_rng
def dojob(process,iterations,samples,rg):
# Do some tasks
result = []
for i in range(iterations):
a = rg.standard_normal(samples)
b = rg.integers(-3,3,samples)
mean = np.mean(a + b)
result.append((i,mean))
return {process: result}
def execute_concurrently(cpus,max_queue_length,get_job_fn,process_result_fn):
running_futures = set()
jobs_complete = 0
job_cond = threading.Condition()
all_complete_event = threading.Event()
def on_complete(future):
nonlocal jobs_complete
if process_result_fn(future.result()):
all_complete_event.set()
running_futures.discard(future)
jobs_complete += 1
with job_cond:
job_cond.notify_all()
time_since_last_status = 0
start_time = time.time()
with cf.ProcessPoolExecutor(cpus) as executor:
while True:
while len(running_futures) < max_queue_length:
fn,args = get_job_fn()
fut = executor.submit(fn,*args)
fut.add_done_callback(on_complete)
running_futures.add(fut)
with job_cond:
job_cond.wait()
if all_complete_event.is_set():
break
if time.time() - time_since_last_status > 1.0:
rps = jobs_complete / (time.time() - start_time)
print(
f"{len(running_futures)} running futures on {cpus} CPUs,"
f"{jobs_complete} complete. RPS: {rps:.2f}"
)
time_since_last_status = time.time()
def main():
ss = SeedSequence(1234567890)
counter = count(start=0,step=1)
iterations = 10000
samples = 1000
results = []
def get_job():
seed = ss.spawn(1)[0]
rg = default_rng(seed)
process = next(counter)
return dojob,(process,rg)
def process_result(result):
for k,v in result.items():
results.append(np.std(v))
if len(results) >= 10000:
return True # signal we're complete
execute_concurrently(
cpus=16,max_queue_length=20,get_job_fn=get_job,process_result_fn=process_result,)
if __name__ == "__main__":
main()