每 20 秒运行一次的 Celery 任务重叠并在最后一个完成之前开始

问题描述

我有一个 celery 任务,每 20 秒在 3 个实例中运行,所有实例都连接到一个数据库。问题是处理程序有时会触发两次任务重叠。当任务重叠时,过滤的项目似乎没有更新:

@periodic_task(run_every=timedelta(seconds=20))
def process_webhook_transactions():
    """Process webhook transactions"""
    transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
    for transaction in transactions:
        data = transaction.body
        event = data.get('event_category')
        if event is None:
            transaction.status = WebhookTransaction.ERROR
            transaction.save()
            continue
        
        
        handler = WEBHOOK_HANDLERS.get(event,default_handler)
        success = handler(data)

        if success:
            transaction.status = WebhookTransaction.PROCESSED
        else:
            transaction.status = WebhookTransaction.ERROR
        transaction.save()

避免这种情况的最佳方法是什么?

解决方法

当 3 个工作人员同时运行该任务时,您可以使用 select_for_updateskip_locked 来防止重复行。像这样:

transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
transactions = transactions.select_for_update(skip_locked=True,of=("self",))

但是这种方法会使一个 worker 实例比其他 worker 更努力地工作(第一个任务选择所有事务,而其他任务没有多少事务剩余)。您可以创建一个同样在 20 秒内运行的新任务,该任务会将所有事务拆分为更小的块(可能是 10-20 个?),然后用这些块触发 process_webhook_transactions

如果 handler = WEBHOOK_HANDLERS.get(event,default_handler) 是异步的,我认为拆分块方法也不错,因为您可以并发运行它以提高任务速度。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...