问题描述
我正在两台不同的计算机上运行celery客户端(Flask)和worker,现在一旦worker完成任务,我就需要在客户端回调一个函数。这可能吗?
芹菜客户:-
celery_app=Celery('test_multihost',broker='amqp://test:test@<worker_ip>/test_host',backend='rpc')
result= testMethod1.apply_async((param1,param2,param3),link=testMethod2.s())
@celery_app.task
def testMethod2():
#testMethod2 body.
芹菜工人:-
celery_app=Celery('test_multihost',backend='rpc')
@celery_app.task
def testMethod1():
#testMethod1 body
但是问题是函数testMethod2在芹菜工人端而不是在客户端端执行。
反正我可以在客户端回调该方法吗?
解决方法
执行此操作的一种方法是让Celery将其结果写入数据库表,然后使用Flask通过重复查询数据库来轮询任务的结果。类似的构造可能是在Redis中保留已完成任务的记录,但是要旨是相同的。
您要向用户触发完成消息吗?如果可以通过电子邮件/短信通知,则可以让Celery当然处理。
如果您需要启动一些Flask进程-并且出于某种原因它确实需要进入Flask的生态系统中-请使用带有requests
模块的worker来调用Flask正在侦听的端点。
我用芹菜信号中的@after_task_publish解决了这个问题。 代码片段如下:-
@after_task_publish.connect(sender=<registered_celery_task>)
def testMethod2(sender=None,headers=None,body=None,**kwargs):
#callback body
在远程计算机上完成celery worker之后,将调用testMethod2。 在这里,我可以使用 headers 参数访问celery worker的结果。