问题描述
我已经研究了多个SO问题,但是仍然没有找到解决方案,并且我已经在进行了几天的工作,因此我们将不胜感激!
我正在使用celery 4.4.0,SQS作为代理和Django 2.2,我的大部分任务都非常长(1到4个小时)。
这是启动工作程序的命令:
celery worker -A config.celeryconfig:app -Ofair --prefetch-multiplier=1 --max-tasks-per-child=2
我的Django配置文件中的配置如下:
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_ACKS_LATE = True
task_acks_late = True # I wasn't sure what's the name of the ack late configuration.
broKER_TRANSPORT_OPTIONS = {
'polling_interval': 3,'region': 'us-east-1','visibility_timeout': 3600 # 1 hour,}
场景-假设我们有2个工人-1,2,每个工人都有一个子流程和两个任务a(超过1小时),b,c,我大约在同一时间派遣了他们:
工人1接任务a
工人1接任务c(但未执行)
工人2接任务b
工人2完成任务b
工人2接任务c(由于能见度超时)
工人2完成任务c
工人1完成任务a
工人1从任务c开始
工人1完成任务c
所以:
- 工作人员仍在预取任务。
- (更糟糕的是)同一任务可以执行两次(如果有,甚至执行更多)
更多的工作人员,执行时间比
visibility_timeout
长。在https://github.com/celery/celery/issues/4400中进行了详细讨论。
我已经意识到解决上述问题的方法是禁用预取行为,但是到目前为止,我还无法实现。
我很沮丧,因此,如果您对如何解决这个问题有任何想法-请让我知道!
注意事项:
*我在生产环境中也看到了这一点,其中每个工人有多个子流程。
*我以前使用倒计时来分派任务(这导致产生ETA任务),但是禁用了它,问题仍然存在。
- 我不使用result_backend,而是处理创建的数据库中模型上的所有内容。
*我不想设置CONCURRENCY=1
,因为我希望每台计算机上有一个工作线程,并且该计算机中有subprocess = cpu内核。
*我已经尝试了许多上述芹菜配置的组合,但是都没有用(上面列出的是基于此评论-https://stackoverflow.com/a/58958823)
*我不喜欢使用的另一种可能的解决方案是将visible_timeout增加到一个很大的数字。但是,这可能导致一项任务将所有长任务一次接一个地提交,并且在所有工作人员之间没有分配它们。
*不确定是否相关-我正在使用EC2机器上的ElasticBeanstalk部署celery。 *我目前正在考虑的另一种可能的解决方案-正在检查任务的状态(我们使用“待处理/正在进行中”等状态),并且仅在待处理状态时才继续-继续(由于可能,这可能无法解决整个用例种族条件,但应该可以解决其中的大部分问题。
解决方法
好吧,我从Celery文档中了解到,“ CELERYD_PREFETCH_MULTIPLIER”将仅更改行为以一次预取一个任务,但并不限制预取功能。因此,如果并发性> 1,它将仍然为每个工作线程预取1个任务。尝试设置--concurrency = 1
一次要预取多少条消息乘以并发进程数。默认值为4(每个进程四个消息)。