问题描述
好的。我被困住了。感谢您的任何建议。
我无法让 celery 返回链接子任务的结果。它执行任务,然后挂起等待result.get().
这是我的测试工作者 celery_test.py
from celery import Celery
app = Celery('tasks',backend='rpc://',broker='amqp://guest@localhost//')
@app.task
def add(x,y):
return x + y
我所有的测试都成功返回,直到我用 result.children[0].get() 请求孩子。如果我使用 .get(timeout=1) 它会超时而不得到结果。
from celery_test import add
result = add.delay(4,4)
result.get()
>>>4
result = (add.s(2,2) | add.s(5))()
result.get()
>>>9
result = add.apply_async((2,2),link=add.s(5))
result.get()
>>>4
result.children
>>>[<AsyncResult: 83259951-1382-4a36-a3e5-8c61540c1f7c>]
result.children[0]
>>><AsyncResult: 83259951-1382-4a36-a3e5-8c61540c1f7c>
result.children[0].get()
**HANGS HERE**
celery worker 控制台显示正确的计算和结果,所以我认为它一定是 RPC 回调问题?
带有调试功能的 Celery Worker 控制台:
[2021-01-21 11:47:12,721: INFO/MainProcess] Received task: celery_test.add[4bb459f0-29d4-4c75-a088-eaa8ce3af089]
[2021-01-21 11:47:12,722: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fd0f674ec10> (args:('celery_test.add','4bb459f0-29d4-4c75-a088-eaa8ce3af089',{'lang': 'py','task': 'celery_test.add','id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','shadow': None,'eta': None,'expires': None,'group': None,'group_index': None,'retries': 0,'timelimit': [None,None],'root_id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','parent_id': None,'argsrepr': '(2,2)','kwargsrepr': '{}','origin': 'gen795981@greg-threadripper','reply_to': 'bd95020d-13a7-33cd-929f-a9b580423335','correlation_id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','hostname': 'celery@greg-threadripper','delivery_info': {'exchange': '','routing_key': 'celery','priority': 0,'redelivered': False},'args': [2,2],'kwargs': {}},'[[2,{},{"callbacks": null,"errbacks": null,"chain": [{"task": "celery_test.add","args": [5],"kwargs": {},"options": {"task_id": "be83f3f4-95bc-4635-b00c-f217d1fc7c79","reply_to": "bd95020d-13a7-33cd-929f-a9b580423335"},"subtask_type": null,"chord_size": null,"immutable": false}],"chord": null}]',... kwargs:{})
[2021-01-21 11:47:12,723: DEBUG/MainProcess] Task accepted: celery_test.add[4bb459f0-29d4-4c75-a088-eaa8ce3af089] pid:822387
[2021-01-21 11:47:12,726: INFO/ForkPoolWorker-64] Task celery_test.add[4bb459f0-29d4-4c75-a088-eaa8ce3af089] succeeded in 0.003127063042484224s: 4
[2021-01-21 11:47:12,726: INFO/MainProcess] Received task: celery_test.add[be83f3f4-95bc-4635-b00c-f217d1fc7c79]
[2021-01-21 11:47:12,726: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7fd0f674ec10> (args:('celery_test.add','be83f3f4-95bc-4635-b00c-f217d1fc7c79','id': 'be83f3f4-95bc-4635-b00c-f217d1fc7c79','parent_id': '4bb459f0-29d4-4c75-a088-eaa8ce3af089','argsrepr': '(4,5)','origin': 'gen822387@greg-threadripper','correlation_id': 'be83f3f4-95bc-4635-b00c-f217d1fc7c79','args': [4,5],'[[4,"chain": [],'application/json','utf-8') kwargs:{})
[2021-01-21 11:47:12,727: INFO/ForkPoolWorker-64] Task celery_test.add[be83f3f4-95bc-4635-b00c-f217d1fc7c79] succeeded in 0.00030225294176489115s: 9
[2021-01-21 11:47:12,727: DEBUG/MainProcess] Task accepted: celery_test.add[be83f3f4-95bc-4635-b00c-f217d1fc7c79] pid:822387
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-19-e82522ea82e7> in <module>
----> 1 result.children[0].get()
/usr/local/lib/python3.8/dist-packages/celery/result.py in get(self,timeout,propagate,interval,no_ack,follow_parents,callback,on_message,on_interval,disable_sync_subtasks,EXCEPTION_STATES,PROPAGATE_STATES)
221
222 self.backend.add_pending_result(self)
--> 223 return self.backend.wait_for_pending(
224 self,timeout=timeout,225 interval=interval,/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in wait_for_pending(self,result,**kwargs)
197 callback=None,propagate=True,**kwargs):
198 self._ensure_not_eager()
--> 199 for _ in self._wait_for_pending(result,**kwargs):
200 pass
201 return result.maybe_throw(callback=callback,propagate=propagate)
/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in _wait_for_pending(self,**kwargs)
263 prev_on_m,self.on_message = self.on_message,on_message
264 try:
--> 265 for _ in self.drain_events_until(
266 result.on_ready,267 on_interval=on_interval):
/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in drain_events_until(self,p,wait)
52 raise socket.timeout()
53 try:
---> 54 yield self.wait_for(p,wait,timeout=interval)
55 except socket.timeout:
56 pass
/usr/local/lib/python3.8/dist-packages/celery/backends/asynchronous.py in wait_for(self,timeout)
61
62 def wait_for(self,timeout=None):
---> 63 wait(timeout=timeout)
64
65
/usr/local/lib/python3.8/dist-packages/celery/backends/rpc.py in drain_events(self,timeout)
57 def drain_events(self,timeout=None):
58 if self._connection:
---> 59 return self._connection.drain_events(timeout=timeout)
60 elif timeout:
61 time.sleep(timeout)
/usr/local/lib/python3.8/dist-packages/kombu/connection.py in drain_events(self,**kwargs)
316 socket.timeout: if the timeout is exceeded.
317 """
--> 318 return self.transport.drain_events(self.connection,**kwargs)
319
320 def maybe_close_channel(self,channel):
/usr/local/lib/python3.8/dist-packages/kombu/transport/pyamqp.py in drain_events(self,connection,**kwargs)
99
100 def drain_events(self,**kwargs):
--> 101 return connection.drain_events(**kwargs)
102
103 def _collect(self,connection):
/usr/local/lib/python3.8/dist-packages/amqp/connection.py in drain_events(self,timeout)
520 def drain_events(self,timeout=None):
521 # read until message is ready
--> 522 while not self.blocking_read(timeout):
523 pass
524
/usr/local/lib/python3.8/dist-packages/amqp/connection.py in blocking_read(self,timeout)
525 def blocking_read(self,timeout=None):
526 with self.transport.having_timeout(timeout):
--> 527 frame = self.transport.read_frame()
528 return self.on_inbound_frame(frame)
529
/usr/local/lib/python3.8/dist-packages/amqp/transport.py in read_frame(self,unpack)
266 read_frame_buffer = EMPTY_BUFFER
267 try:
--> 268 frame_header = read(7,True)
269 read_frame_buffer += frame_header
270 frame_type,channel,size = unpack('>BHI',frame_header)
/usr/local/lib/python3.8/dist-packages/amqp/transport.py in _read(self,n,initial,_errnos)
455 while len(rbuf) < n:
456 try:
--> 457 s = recv(n - len(rbuf))
458 except OSError as exc:
459 if exc.errno in _errnos:
KeyboardInterrupt:
芹菜报告:
-------------- celery@greg-threadripper v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-5.8.0-33-generic-x86_64-with-glibc2.32 2021-01-21 11:43:57
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7fd0f665d760
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)