pika 和 rabbitMQ 每次从队列中获取一条消息,但在 python 中不断调用回调函数

问题描述

我希望进程正常退出,直到其当前任务完成。 (如果它在回调函数中做某事)。逻辑很简单,一旦有 SIGINT,我就会引发 is_interrupt,并且在我的无限 while 循环中,我不断检查队列中是否有消息。

如果队列中有消息,我会生成一个子进程来执行回调函数。在回调函数中,它以 sys.exit(0) 结尾,以确保一旦孩子完成回调,它就会退出

目前我面临的问题是:

如果发送方将一些消息发送到队列中,然后接收方(下面的代码)启动。它可以处理那些存在的消息,然后继续进入回调函数。子进程不断生成并失败。 (我不明白是什么导致这种情况发生。一旦子进程完成它的任务,它就会消失,主进程将继续循环并等待消息。

如果接收者先启动,然后发送者发布一些消息,接收者将什么也收不到。

请帮帮我。谢谢!

import pika,sys,os,signal,requests

is_interrupt = 0

def signal_handler(sig,frame):
    global is_interrupt
    is_interrupt = 1

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    queue_state = channel.queue_declare(queue='task_queue',durable=True,passive=True)
    
    # Register signal handler. 
    signal.signal(signal.SIGINT,signal_handler)

    def callback(ch,method,properties,body):
        # Do something here..
        os._exit(0)

    print('Worker waiting for messages. To exit press CTRL+C')

    while(True):
        if is_interrupt == 1:
            print("\nExit")
            os._exit(0)
        queue_empty = queue_state.message_count == 0
        #print(queue_empty)
        if not queue_empty:
            child_pid = os.fork()
            if child_pid == 0:
                method,body = channel.basic_get(queue='task_queue',auto_ack=True)
                callback(channel,body)
            else:
                queue_empty = True
                os.wait()

if __name__ == '__main__':
    main()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...