问题描述
是否可以让ThreadPoolExecutor
等待其所有期货及其add_done_callback()
函数完成而不必调用.shutdown(wait=True)
?下面的代码片段说明了我要完成的工作的实质,即在外循环迭代之间重用线程池。
from concurrent.futures import ThreadPoolExecutor,wait
import time
def proc_func(n):
return n + 1
def create_callback_func(fid,sleep_time):
def callback(future):
time.sleep(sleep_time)
fid.write(str(future.result()))
return
return callback
num_workers = 4
num_files_write = 3
num_tasks = 8
sleep_time = 1
pool = ThreadPoolExecutor(max_workers=num_workers)
for n in range(num_files_write):
fid = open(f'test{n}.txt','w')
futs = []
callback_func = create_callback_func(fid,sleep_time)
for t in range(num_tasks):
fut = pool.submit(proc_func,n)
fut.add_done_callback(callback_func)
futs.append(fut)
wait(futs)
fid.close()
pool.shutdown(wait=True)
运行此代码将抛出一堆ValueError: I/O operation on closed file.
,并且所写入的三个文件均包含以下内容:
test0.txt-> 1111
test1.txt-> 2222
test3.txt-> 3333
很显然,这是错误的,每个数字应该有八个。如果我为每个文件创建并关闭一个单独的ThreadPoolExecutor
,那么将获得正确的结果。因此,我知道Executor
可以适当地等待所有回调完成,但是我可以告诉它这样做而不必关闭它吗?
解决方法
恐怕无法完成,您正在“滥用”回调。
回调的主要目的是通知计划的工作已经完成。
内部将来的状态为PENDING-> RUNNING-> FINISHED(不考虑简短性)。达到FINISHED状态时,将调用回调,但完成时将没有下一个状态。这就是为什么无法与该事件同步。
在一个可用线程中执行提交函数的核心是(简化):
try:
result = self.fn(*self.args,**self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
else:
self.future.set_result(result)
set_exception
和set_result
都看起来像这样(非常简化):
... save the result/exception
self._state = FINISHED
... wakeup all waiters
self._invoke_callbacks() # this is the last statement
将来已经完成,即当调用“完成”回调时处于“完成”状态。在标记完成之前通知工作已经完成是没有意义的。
正如您已经注意到的,在您的代码中:
wait(futs)
fid.close()
wait
返回,文件关闭,但是回调尚未完成,并且无法尝试写入已关闭的文件。
第二个问题是shutdown(wait=True)
为什么起作用?仅仅是因为它等待所有线程:
if wait:
for t in self._threads:
t.join()
那些线程也执行回调(请参见上面的代码段)。这就是为什么线程完成后必须完成回调执行的原因。