问题描述
我正在尝试设置通过 RabbitMQ 接收消息的 Python 微服务,同时具有用于 Kubernetes 运行状况检查的 /health
REST 端点。我将 pika 用于 RabbitMQ 使用者,connexion 用于 REST 端点。
但是,当我在 main()
中启动 RabbitMQ 使用者时,连接应用程序不会启动。
python-app.py
#!/usr/bin/env python
import pika,sys,os,connexion
from flask import Flask,request,jsonify
app = connexion.FlaskApp(__name__,specification_dir='./')
def main():
# Connection
...
# Exchange and queues
...
def callback(ch,method,properties,body):
...
channel.basic_consume(queue='pg-python',on_message_callback=callback,auto_ack=True)
print(' [*] Waiting for messages.')
channel.start_consuming()
app.run(port=8080,use_reloader=False)
@app.route('/api/v1/health',methods=['GET'])
def return_health():
message = {'status':'Healthy! <3'}
return jsonify(message)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
输出:
[*] Waiting for messages.
如何正确初始化两个组件?我需要使用线程吗?
解决方法
我已经通过在单独的线程中初始化 RabbitMQ 消费者来解决这个问题:
#!/usr/bin/env python
import pika,sys,os,threading
from flask import Flask,request,jsonify
app = Flask(__name__)
def start_rmq_connection():
# Connection
...
# Exchange and queues
...
def callback(ch,method,properties,body):
...
channel.basic_consume(queue='pg-python',on_message_callback=callback,auto_ack=True)
print(' [*] Waiting for messages.')
channel.start_consuming()
@app.route('/api/v1/health',methods=['GET'])
def return_health():
message = {'status':'Healthy! <3'}
return jsonify(message)
if __name__ == '__main__':
try:
thread_1 = threading.Thread(target=start_rmq_connection)
thread_1.start()
thread_1.join(0)
app.run()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)