在消息未确认时暂停消费消息

问题描述

我有一个 pika RabbitMQ 实现,其中消息被提交到队列,并由消费者处理。处理每条消息可能需要一些时间,而且我只想在任何给定时间为每个消费者处理一条消息。

为此,我在单独的工作线程中处理消息,并最终在任务完成时通过添加回调线程安全来确认消息。但是,主线程仍在消费消息。一种方法是在工作完成时发送 nack 条消息,但随着所有消费者变得忙碌,队列中会出现大量重新提交。

有什么办法可以“暂停”消费,同时仍然保持消息未确认?

busy = multiprocessing.Value('b',False)


def on_message(channel,method_frame,header_frame,body):
    with busy.get_lock():
        am_busy = busy.value
        if not am_busy:
            busy.value = True
    if not am_busy:
        thread = threading.Thread(target=do_work,args=(channel,method_frame.delivery_tag,body))
        thread.start()
    else:
        nack_message(channel,method_frame.delivery_tag)
        
        
def do_work(channel,delivery_tag,body):
    time.sleep(60) # do something with body content...
    cb = functools.partial(ack_message,channel,delivery_tag)
    connection.add_callback_threadsafe(cb)
    
    with busy.get_lock():
        busy.value = False
        
        
def ack_message(ch,delivery_tag):
    if ch.is_open:
        ch.basic_ack(delivery_tag)
    else:
        pass


def nack_message(ch,delivery_tag):
    if ch.is_open:
        ch.basic_nack(delivery_tag=delivery_tag,requeue=True)
    else:
        pass
        
        
def main():
    connection = pika.BlockingConnection(parameters=parameters)
    channel = connection.channel()
    queue_name = "work"
    channel.queue_declare(queue=queue_name)
    on_message_callback = functools.partial(on_message,connection=connection)
    channel.basic_consume(queue=queue_name,on_message_callback=on_message_callback)

    channel.start_consuming()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...