问题描述
架构
我计划通过 MQTT 将来自 IoT 节点的数据发布到 RabbitMQ 队列中。然后处理数据,需要将状态保存到Redis中。
当前实施
我为 RabbitMQ 启动了一个 docker 容器并将其配置为启用 MQTT(端口:1883)。
基于RabbitMQ's MQTT Plugin Documentation
- 来自 MQTT 端口的数据被发送到
amq.topic
Exchange 并订阅类似于 MQTT 主题的队列名称,其中/
被.
替换,例如hello/test
MQTT 主题 ->hello.test
RabbitMQ 队列。
通过 AMQP 端口的基本消耗
import argparse,sys,pika
def main():
args = parse_arguments()
# CLI TAKES IN broKER ParaMETERS in `args`
# Removed for brevity
broker_credentials = pika.PlainCredentials(args.rabbit_user,args.rabbit_pass)
print("Setting Up Connection with RabbitMQ broker")
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=args.rabbit_host,port=args.rabbit_port,credentials=broker_credentials
)
)
# Create Channel
print("Creating Channel")
channel = connection.channel()
# Declare the Exchange
print("Declaring the exchange to read incoming MQTT Messages")
# Exchange to read mqtt via RabbitMQ is always `amq.topic`
# Type of exchange is `topic` based
channel.exchange_declare(exchange='amq.topic',exchange_type='topic',durable=True)
# the Queue Name for MQTT Messages is the MQTT-TOPIC name where `/` is replaced by `.`
# Let RabbitMQ create the name for us
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
# Bind the Queue with some specific topic where you wish to get the data from
channel.queue_bind(exchange='amq.topic',queue=queue_name,routing_key=args.rabbit_topic)
# Start Consuming from Queue
channel.basic_consume(queue=queue_name,on_message_callback=consumer_callback,auto_ack=True)
print("Waiting for MQTT Payload")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("CTRL+C pressed")
sys.exit(0)
要求
我只知道 Celery 并且正在研究它。在很多例子中通常是当外部脚本触发任务然后工作人员解决任务并将其保存到backend
(在我的情况下为Redis)
例如
app = Celery('tasks',broker='RABBITMQ_broKER_URL')
@app.task
def process_iot_data(incoming_data):
time.sleep(1.0)
# Do Some Necessary data processing and store the processed state in Redis
Celery 设计中的困惑
我浏览了很多博客,其中 Celery 任务与 REST API 一起使用,在调用 API 时,任务会排队并执行,状态保存在后端。
我找不到任何示例,在初始化 Celery(..)
应用程序期间,我可以实例化必要的 exchange
即 amq.topic
以及我通过上面的消费者代码使用 {{ 1}}。
当 RabbitMQ 队列中的数据被推送时,我无法意识到在任务排队的情况下使用 celery 的可能方法是什么。与发送 REST API 请求不同,我希望在相应的队列中插入数据后,使用 celery 任务处理队列中的传入数据。
这是使用 Celery 可以实现的,还是我应该坚持使用 pika
并在回调函数中编写内容?
瞄准
我希望进行一些模拟,在这种情况下,我可以将消费者扩展多倍,并尝试查看我的 dockerized 消费者应用程序能够承受多大的数据和处理量。
解决方法
简而言之 - 没有。
Celery 并非旨在处理发送到消息队列系统的任意数据。它旨在生成/消费包含序列化 Celery 任务详细信息的消息,以便消费者可以在另一端执行特定任务,并将结果放入结果后端。
但是,我坚信几乎任何您能想到的任意消息都可以(以这种方式或其他方式)包装到 Celery 任务中。但真正的问题是当您不希望 Celery 位于其中一个端(生产者或消费者)时。生产者可以使用方便的 send_task() 函数发送任务,而无需共享包含任务定义的代码。