Celery 4.4.0-使用SQS作为代理时禁用预取

问题描述

我已经研究了多个SO问题,但是仍然没有找到解决方案,并且我已经在进行了几天的工作,因此我们将不胜感激!

我正在使用celery 4.4.0,SQS作为代理和Django 2.2,我的大部分任务都非常长(1到4个小时)。
这是启动工作程序的命令:

celery worker -A config.celeryconfig:app -Ofair --prefetch-multiplier=1 --max-tasks-per-child=2

我的Django配置文件中的配置如下:

CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_ACKS_LATE = True
task_acks_late = True # I wasn't sure what's the name of the ack late configuration.

broKER_TRANSPORT_OPTIONS = {
    'polling_interval': 3,'region': 'us-east-1','visibility_timeout': 3600 # 1 hour,}

场景-假设我们有2个工人-1,2,每个工人都有一个子流程和两个任务a(超过1小时),b,c,我大约在同一时间派遣了他们:
工人1接任务a
工人1接任务c(但未执行)
工人2接任务b
工人2完成任务b
工人2接任务c(由于能见度超时)
工人2完成任务c
工人1完成任务a
工人1从任务c开始
工人1完成任务c

所以:

  1. 工作人员仍在预取任务。
  2. (更糟糕的是)同一任务可以执行两次(如果有,甚至执行更多) 更多的工作人员,执行时间比visibility_timeout长。在https://github.com/celery/celery/issues/4400中进行了详细讨论。

我已经意识到解决上述问题的方法是禁用预取行为,但是到目前为止,我还无法实现。

我很沮丧,因此,如果您对如何解决这个问题有任何想法-请让我知道!

注意事项:
*我在生产环境中也看到了这一点,其中每个工人有多个子流程。
*我以前使用倒计时来分派任务(这导致产生ETA任务),但是禁用了它,问题仍然存在。

  • 我不使用result_backend,而是处理创建的数据库中模型上的所有内容
    *我不想设置CONCURRENCY=1,因为我希望每台计算机上有一个工作线程,并且该计算机中有subprocess = cpu内核。
    *我已经尝试了许多上述芹菜配置的组合,但是都没有用(上面列出的是基于此评论-https://stackoverflow.com/a/58958823
    *我不喜欢使用的另一种可能的解决方案是将visible_timeout增加一个很大的数字。但是,这可能导致一项任务将所有长任务一次接一个地提交,并且在所有工作人员之间没有分配它们。
    *不确定是否相关-我正在使用EC2机器上的ElasticBeanstalk部署celery。 *我目前正在考虑的另一种可能的解决方案-正在检查任务的状态(我们使用“待处理/正在进行中”等状态),并且仅在待处理状态时才继续-继续(由于可能,这可能无法解决整个用例种族条件,但应该可以解决其中的大部分问题。

解决方法

好吧,我从Celery文档中了解到,“ CELERYD_PREFETCH_MULTIPLIER”将仅更改行为以一次预取一个任务,但并不限制预取功能。因此,如果并发性> 1,它将仍然为每个工作线程预取1个任务。尝试设置--concurrency = 1

一次要预取多少条消息乘以并发进程数。默认值为4(每个进程四个消息)。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...