问题描述
我的应用遵循生产者和消费者模式。有一个生产者、2 个任务(A、B)和一个消费者。
Producer 读取 sql 表并输出到 A 和 B。他们依次对该输出执行一些任务并发送给消费者。消费者从 A & B 读取,然后输出到 s3 文件。
Producer&A、Producer&B、A&Consumer、B&Consumer之间存在记忆通道。
这就是我现在终止程序的方式(一旦生产者耗尽了 sql 表中的所有行):
async with trio.open_nursery() as nursery:
nursery.start_soon(A.run)
nursery.start_soon(B.run)
nursery.start_soon(consumer.run)
while True:
rowcount = await producer_task.run()
if not rowcount:
logging.info('Producer exiting loop')
# Terminate the tasks' inner loops
for t in (A,B,consumer):
t.is_terminated = True
# Let subtasks wrap up
await trio.sleep(60 * 5)
# Terminate all send_channels,subtasks can be stuck receiving.
for channel in all_channels.keys():
await channel.aclose()
break
这是A&B的基类:
class AsyncSubtask(object):
def __init__(self,receive_channel,send_channel):
self.receive_channel = receive_channel
self.send_channel = send_channel
self.is_terminated = False
async def run(self):
try:
while not self.is_terminated:
input_work = await self.receive_channel.receive()
if input_work:
output_work = await self.loop(input_work)
await self.send_channel.send(output_work)
logging.info(f'{self.__class__.__name__} -> {self.get_logging_name(output_work)}')
else:
logging.warning(f'{self.__class__.__name__} received empty inputs.')
except trio.EndOfChannel:
pass
logging.info(f'{self.__class__.__name__} exiting loop')
async def loop(self,work):
raise NotImplementedError
def get_logging_name(self,output_work):
return len(output_work)
Traceback (most recent call last):
File "/myfile/bin/fetch_ott_features.py",line 386,in <module>
trio.run(parent)
File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py",line 1896,in run
raise runner.main_task_outcome.error
File "/myfile/bin/fetch_ott_features.py",line 379,in parent
break
File "/myfile/lib/python3.6/site-packages/trio/_core/_run.py",line 741,in __aexit__
raise combined_error_from_nursery
File "/myfile/lib/python3.6/site-packages/a9_ifs_user_reach/async_util.py",line 27,in run
await self.send_channel.send(output_work)
File "/myfile/lib/python3.6/site-packages/trio/_channel.py",line 178,in send
await trio.lowlevel.wait_task_rescheduled(abort_fn)
File "/myfile/lib/python3.6/site-packages/trio/_core/_traps.py",line 166,in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "/myfile/lib/python3.6/site-packages/outcome/_sync.py",line 111,in unwrap
raise captured_error
trio.brokenResourceError
注意:第 379 行中的 break
引用了上面 async with trio.open_nursery() as nursery
块中的最后一行。
看来我终止程序的方式导致了这个问题。我已经在两个不同的场合运行了这个程序,并得到了同样的错误。
我应该如何在不导致此错误的情况下终止我的程序?
解决方法
回溯表明 BrokenResourceError
来自对 await send_channel.send(...)
的调用。
send_channel.send
如果您尝试发送一些数据并且通道的接收端已经关闭,则会引发此错误。
我怀疑问题是什么时候做
for channel in all_channels.keys():
await channel.aclose()
...您实际上正在关闭所有频道,包括仍在使用的频道。
如果你有数据从生产者 -> A/B -> 消费者流出,那么处理关闭的通常模式是:
- 生产者读完表格并确定它没有更多要发送的内容。
- Producer 关闭其频道对象并退出。
- A/B 最终处理完通道中的所有内容,然后收到生产者已关闭它的通知。如果您使用的是
async for blah in receive_channel: ...
,则循环将在一切完成后终止。如果您正在调用receive_channel.receive()
,那么您会得到一个可以捕获的EndOfChannel
异常。 - A/B 关闭他们的频道对象并退出。
- 消费者最终会完成处理其传入通道中的所有内容,然后收到 A/B 已关闭它的通知。消费者退出。
tl;dr:如果你把每一个任务都写成这样:
async def producer(send_to_ab):
async with send_to_ab:
async for row in fetch_rows_somewhere():
await send_to_ab.send(ab)
async def a_or_b(receive_from_producer,send_to_consumer):
async with receive_from_producer,send_to_consumer:
async for row in receive_from_producer:
result = await do_stuff_with(row)
await send_to_consumer.send(result)
async def consumer(receive_from_ab):
async with receive_from_ab:
async for result in receive_from_ab:
await put_in_s3(result)
...然后它应该全部清理自己并自动可靠地终止。