问题描述
我在生产者消费者模式中有两个线程。当消费者收到数据时,它会调用一个耗时的函数 expensive()
,然后进入一个 for
循环。
但是如果消费者正在处理新数据到达时,它应该中止当前工作,(退出循环)并从新数据开始。
我尝试使用 queue.Queue 这样的东西:
q = queue.Queue()
def producer():
while True:
...
q.put(d)
def consumer():
while True:
d = q.get()
expensive(d)
for i in range(10000):
...
if not q.empty():
break
但是这段代码的问题是,如果生产者放入数据太快,并且队列中有很多项目,消费者将执行 expensive(d)
调用加上一次循环迭代,然后对每个项目中止,这很耗时。代码应该可以工作,但没有优化。
解决方法
在不修改 expensive
中的代码的情况下,一种解决方案可能是将其作为单独的进程运行,这将使您能够提前终止它。但是,由于没有提及 expensive
运行多长时间,因此这可能会或可能不会提高时间效率。
import multiprocessing as mp
q = queue.Queue()
def producer():
while True:
...
q.put(d)
def consumer():
while True:
d = q.get()
exp = mp.Thread(target=expensive,args=(d,))
for i in range(10000):
...
if not q.empty():
exp.terminate() # or exp.kill()
break
,
嗯,一种方法是使用队列设计,可以保留等待和工作线程的内部列表。然后,您可以创建多个消费者线程来等待队列,并在工作到达时设置一个已知的消费者线程来完成工作。当线程完成后,它调用队列将自己从工作列表中删除,并将自己添加到等待列表中。
每个消费者线程都有一个“中止”原子,可以通知线程提前结束。线程执行内部循环时会有一些延迟,但这无关紧要....
如果新工作从生产者到达队列,并且工作队列不为空,则可以设置工作线程的“中止”布尔值,并将它们的优先级设置为尽可能低。然后可以将新工作分派到池中的一个等待线程上,从而将其设置为工作。
等待线程将需要一个“开始”函数,该函数向等待线程……好吧……等待的事件/sema/condvar 发出信号。这允许提供工作的生产者设置该特定线程运行,而不是池中的任何线程都可以获取工作的“通常”做法。
这样的设计允许“立即”开始新的工作,通过降低前一个工作线程的优先级使其变得无关紧要,并避免线程/进程终止的开销。