为什么 Celery 不会返回子结果?

问题描述

好的。我被困住了。感谢您的任何建议。

我无法让 celery 返回链接子任务的结果。它执行任务,然后挂起等待result.get().

这是我的测试工作者 celery_test.py

from celery import Celery

app = Celery('tasks',backend='rpc://',broker='amqp://guest@localhost//')

@app.task
def add(x,y):
    return x + y

我所有的测试都成功返回,直到我用 result.children[0].get() 请求孩子。如果我使用 .get(timeout=1) 它会超时而不得到结果。

测试添加函数

from celery_test import add

result = add.delay(4,4)
result.get()

>>>4

result = (add.s(2,2) | add.s(5))()
result.get()

>>>9

result = add.apply_async((2,2),link=add.s(5))
result.get()

>>>4

result.children
>>>[<AsyncResult: 83259951-1382-4a36-a3e5-8c61540c1f7c>]

result.children[0]
>>><AsyncResult: 83259951-1382-4a36-a3e5-8c61540c1f7c>

result.children[0].get()
**HANGS HERE**

celery worker 控制台显示正确的计算和结果,所以我认为它一定是 RPC 回调问题?

带有调试功能的 Celery Worker 控制台:

[2021-01-21 11:47:12,721: INFO/MainProcess] Received task: celery_test.add[4bb459f0-29d4-4c75-a088-eaa8ce3af089]  
[2021-01-21 11:47:12,722: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fd0f674ec10> (args:('celery_test.add','4bb459f0-29d4-4c75-a088-eaa8ce3af089',{'lang': 'py','task': 'celery_test.add','id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','shadow': None,'eta': None,'expires': None,'group': None,'group_index': None,'retries': 0,'timelimit': [None,None],'root_id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','parent_id': None,'argsrepr': '(2,2)','kwargsrepr': '{}','origin': 'gen795981@greg-threadripper','reply_to': 'bd95020d-13a7-33cd-929f-a9b580423335','correlation_id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','hostname': 'celery@greg-threadripper','delivery_info': {'exchange': '','routing_key': 'celery','priority': 0,'redelivered': False},'args': [2,2],'kwargs': {}},'[[2,{},{"callbacks": null,"errbacks": null,"chain": [{"task": "celery_test.add","args": [5],"kwargs": {},"options": {"task_id": "be83f3f4-95bc-4635-b00c-f217d1fc7c79","reply_to": "bd95020d-13a7-33cd-929f-a9b580423335"},"subtask_type": null,"chord_size": null,"immutable": false}],"chord": null}]',... kwargs:{})
[2021-01-21 11:47:12,723: DEBUG/MainProcess] Task accepted: celery_test.add[4bb459f0-29d4-4c75-a088-eaa8ce3af089] pid:822387
[2021-01-21 11:47:12,726: INFO/ForkPoolWorker-64] Task celery_test.add[4bb459f0-29d4-4c75-a088-eaa8ce3af089] succeeded in 0.003127063042484224s: 4
[2021-01-21 11:47:12,726: INFO/MainProcess] Received task: celery_test.add[be83f3f4-95bc-4635-b00c-f217d1fc7c79]  
[2021-01-21 11:47:12,726: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fd0f674ec10> (args:('celery_test.add','be83f3f4-95bc-4635-b00c-f217d1fc7c79','id': 'be83f3f4-95bc-4635-b00c-f217d1fc7c79','parent_id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','argsrepr': '(4,5)','origin': 'gen822387@greg-threadripper','correlation_id': 'be83f3f4-95bc-4635-b00c-f217d1fc7c79','args': [4,5],'[[4,"chain": [],'application/json','utf-8') kwargs:{})
[2021-01-21 11:47:12,727: INFO/ForkPoolWorker-64] Task celery_test.add[be83f3f4-95bc-4635-b00c-f217d1fc7c79] succeeded in 0.00030225294176489115s: 9
[2021-01-21 11:47:12,727: DEBUG/MainProcess] Task accepted: celery_test.add[be83f3f4-95bc-4635-b00c-f217d1fc7c79] pid:822387

错误/挂起。当我停止执行时,我看到这个错误

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-19-e82522ea82e7> in <module>
----> 1 result.children[0].get()

/usr/local/lib/python3.8/dist-packages/celery/result.py in get(self,timeout,propagate,interval,no_ack,follow_parents,callback,on_message,on_interval,disable_sync_subtasks,EXCEPTION_STATES,PROPAGATE_STATES)
    221 
    222         self.backend.add_pending_result(self)
--> 223         return self.backend.wait_for_pending(
    224             self,timeout=timeout,225             interval=interval,/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in wait_for_pending(self,result,**kwargs)
    197                          callback=None,propagate=True,**kwargs):
    198         self._ensure_not_eager()
--> 199         for _ in self._wait_for_pending(result,**kwargs):
    200             pass
    201         return result.maybe_throw(callback=callback,propagate=propagate)

/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in _wait_for_pending(self,**kwargs)
    263         prev_on_m,self.on_message = self.on_message,on_message
    264         try:
--> 265             for _ in self.drain_events_until(
    266                     result.on_ready,267                     on_interval=on_interval):

/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in drain_events_until(self,p,wait)
     52                 raise socket.timeout()
     53             try:
---> 54                 yield self.wait_for(p,wait,timeout=interval)
     55             except socket.timeout:
     56                 pass

/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in wait_for(self,timeout)
     61 
     62     def wait_for(self,timeout=None):
---> 63         wait(timeout=timeout)
     64 
     65 

/usr/local/lib/python3.8/dist-packages/celery/backends/rpc.py in drain_events(self,timeout)
     57     def drain_events(self,timeout=None):
     58         if self._connection:
---> 59             return self._connection.drain_events(timeout=timeout)
     60         elif timeout:
     61             time.sleep(timeout)

/usr/local/lib/python3.8/dist-packages/kombu/connection.py in drain_events(self,**kwargs)
    316             socket.timeout: if the timeout is exceeded.
    317         """
--> 318         return self.transport.drain_events(self.connection,**kwargs)
    319 
    320     def maybe_close_channel(self,channel):

/usr/local/lib/python3.8/dist-packages/kombu/transport/pyamqp.py in drain_events(self,connection,**kwargs)
     99 
    100     def drain_events(self,**kwargs):
--> 101         return connection.drain_events(**kwargs)
    102 
    103     def _collect(self,connection):

/usr/local/lib/python3.8/dist-packages/amqp/connection.py in drain_events(self,timeout)
    520     def drain_events(self,timeout=None):
    521         # read until message is ready
--> 522         while not self.blocking_read(timeout):
    523             pass
    524 

/usr/local/lib/python3.8/dist-packages/amqp/connection.py in blocking_read(self,timeout)
    525     def blocking_read(self,timeout=None):
    526         with self.transport.having_timeout(timeout):
--> 527             frame = self.transport.read_frame()
    528         return self.on_inbound_frame(frame)
    529 

/usr/local/lib/python3.8/dist-packages/amqp/transport.py in read_frame(self,unpack)
    266         read_frame_buffer = EMPTY_BUFFER
    267         try:
--> 268             frame_header = read(7,True)
    269             read_frame_buffer += frame_header
    270             frame_type,channel,size = unpack('>BHI',frame_header)

/usr/local/lib/python3.8/dist-packages/amqp/transport.py in _read(self,n,initial,_errnos)
    455             while len(rbuf) < n:
    456                 try:
--> 457                     s = recv(n - len(rbuf))
    458                 except OSError as exc:
    459                     if exc.errno in _errnos:

KeyboardInterrupt: 

芹菜报告:

 -------------- celery@greg-threadripper v5.0.5 (singularity)
--- ***** ----- 
-- ******* ---- Linux-5.8.0-33-generic-x86_64-with-glibc2.32 2021-01-21 11:43:57
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fd0f665d760
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)