问题描述
我有一个 pika RabbitMQ 实现,其中消息被提交到队列,并由消费者处理。处理每条消息可能需要一些时间,而且我只想在任何给定时间为每个消费者处理一条消息。
为此,我在单独的工作线程中处理消息,并最终在任务完成时通过添加回调线程安全来确认消息。但是,主线程仍在消费消息。一种方法是在工作完成时发送 nack
条消息,但随着所有消费者变得忙碌,队列中会出现大量重新提交。
有什么办法可以“暂停”消费,同时仍然保持消息未确认?
busy = multiprocessing.Value('b',False)
def on_message(channel,method_frame,header_frame,body):
with busy.get_lock():
am_busy = busy.value
if not am_busy:
busy.value = True
if not am_busy:
thread = threading.Thread(target=do_work,args=(channel,method_frame.delivery_tag,body))
thread.start()
else:
nack_message(channel,method_frame.delivery_tag)
def do_work(channel,delivery_tag,body):
time.sleep(60) # do something with body content...
cb = functools.partial(ack_message,channel,delivery_tag)
connection.add_callback_threadsafe(cb)
with busy.get_lock():
busy.value = False
def ack_message(ch,delivery_tag):
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
pass
def nack_message(ch,delivery_tag):
if ch.is_open:
ch.basic_nack(delivery_tag=delivery_tag,requeue=True)
else:
pass
def main():
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
queue_name = "work"
channel.queue_declare(queue=queue_name)
on_message_callback = functools.partial(on_message,connection=connection)
channel.basic_consume(queue=queue_name,on_message_callback=on_message_callback)
channel.start_consuming()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)