问题描述
我有一个 celery 任务,每 20 秒在 3 个实例中运行,所有实例都连接到一个数据库。问题是处理程序有时会触发两次任务重叠。当任务重叠时,过滤的项目似乎没有更新:
@periodic_task(run_every=timedelta(seconds=20))
def process_webhook_transactions():
"""Process webhook transactions"""
transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
for transaction in transactions:
data = transaction.body
event = data.get('event_category')
if event is None:
transaction.status = WebhookTransaction.ERROR
transaction.save()
continue
handler = WEBHOOK_HANDLERS.get(event,default_handler)
success = handler(data)
if success:
transaction.status = WebhookTransaction.PROCESSED
else:
transaction.status = WebhookTransaction.ERROR
transaction.save()
避免这种情况的最佳方法是什么?
解决方法
当 3 个工作人员同时运行该任务时,您可以使用 select_for_update
和 skip_locked
来防止重复行。像这样:
transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
transactions = transactions.select_for_update(skip_locked=True,of=("self",))
但是这种方法会使一个 worker 实例比其他 worker 更努力地工作(第一个任务选择所有事务,而其他任务没有多少事务剩余)。您可以创建一个同样在 20 秒内运行的新任务,该任务会将所有事务拆分为更小的块(可能是 10-20 个?),然后用这些块触发 process_webhook_transactions
。
如果 handler = WEBHOOK_HANDLERS.get(event,default_handler)
是异步的,我认为拆分块方法也不错,因为您可以并发运行它以提高任务速度。