远程芹菜工作者完成后通知或回调烧瓶

问题描述

我正在两台不同的计算机上运行celery客户端(Flask)和worker,现在一旦worker完成任务,我就需要在客户端回调一个函数。这可能吗?

芹菜客户:-

celery_app=Celery('test_multihost',broker='amqp://test:test@<worker_ip>/test_host',backend='rpc')
result= testMethod1.apply_async((param1,param2,param3),link=testMethod2.s())

@celery_app.task
def testMethod2():
    #testMethod2 body.

芹菜工人:-

celery_app=Celery('test_multihost',backend='rpc')
@celery_app.task
def testMethod1():
   #testMethod1 body

但是问题是函数testMethod2在芹菜工人端而不是在客户端端执行。

反正我可以在客户端回调该方法吗?

解决方法

执行此操作的一种方法是让Celery将其结果写入数据库表,然后使用Flask通过重复查询数据库来轮询任务的结果。类似的构造可能是在Redis中保留已完成任务的记录,但是要旨是相同的。

您要向用户触发完成消息吗?如果可以通过电子邮件/短信通知,则可以让Celery当然处理。

如果您需要启动一些Flask进程-并且出于某种原因它确实需要进入Flask的生态系统中-请使用带有requests模块的worker来调用Flask正在侦听的端点。

,

我用芹菜信号中的@after_task_publish解决了这个问题。 代码片段如下:-

@after_task_publish.connect(sender=<registered_celery_task>)
def testMethod2(sender=None,headers=None,body=None,**kwargs):
    #callback body

在远程计算机上完成celery worker之后,将调用testMethod2。 在这里,我可以使用 headers 参数访问celery worker的结果。