问题描述
我正在尝试为RabbitMQ中的两个队列分配优先级,以便在从Queue2进行使用之前,我的工作人员将始终首先使用并清除Queue1中的所有消息。我使用的芹菜配置文件名为celeryconfig.py
,如下所示:
import ssl
broker_url="amqps://USR:PWD@URL//"
result_backend="db+postgresql://USR:PWD@BURL?sslmode=verify-full&sslrootcert=/usr/local/share/ca-certificates/MY_CACERT.crt"
include=["my_tasks"]
task_acks_late=True
task_default_rate_limit="150/m"
task_time_limit=300
worker_prefetch_multiplier=1
worker_max_tasks_per_child=2
timezone="UTC"
broker_use_ssl = {'keyfile': '/usr/local/share/private/MY_KEY.key','certfile': '/usr/local/share/ca-certificates/MY_CERT.crt','ca_certs': '/usr/local/share/ca-certificates/MY_CACERT.crt','cert_reqs': ssl.CERT_REQUIRED,'ssl_version': ssl.PROTOCOL_TLSv1_2}
目前我只有1个队列,这就是我启动芹菜工人的方式
celery -A celery_app worker -l info --config celeryconfig --concurrency=16 -n "%h:celery-worker" -O fair
我已经在https://docs.celeryproject.org/en/v4.3.0/userguide/routing.html#routing-options-rabbitmq-priorities处阅读了简短文档,但只提到设置 max 优先级,而没有告诉我如何为RabbitMQ中的每个队列设置优先级。
- RabbitMQ:3.7.17
- 芹菜:4.3.0
- Python:3.6.7
- 操作系统:Ubuntu 18.04.3 LTS仿生
有人可以阐明这一点吗?谢谢
解决方法
我对芹菜一点都不熟悉,但是其他系统可以根据队列或其他过滤器运行独立的工作程序。而且每个工作人员都可以拥有自己的配置,用于每秒消耗的消息,并发等。
您可以创建两个celery配置,其中一个配置为优先级为10,另一个优先级为5,并运行两个“实例”的芹菜。
这将更好地工作...在同一工作程序中,每个消息的优先级都不能很好地工作。