问题描述
我有以下项目目录:
azima:
__init.py
main.py
tasks.py
monitor.py
tasks.py
from .main import app
@app.task
def add(x,y):
return x + y
@app.task
def mul(x,y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
main.py
from celery import Celery
app = Celery('azima',backend='redis://localhost:6379/0',broker='redis://localhost:6379/0',include=['azima.tasks'])
# Optional configuration,see the application user guide.
app.conf.update(
result_expires=3600,)
if __name__ == '__main__':
app.start()
监视器.py
from .main import app
def my_monitor(app):
state = app.events.State()
def announce_Failed_tasks(event):
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK Failed: {task.name}[{task.uuid}]')
def announce_succeeded_tasks(event):
print('task succeeded')
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')
def worker_online_handler(event):
state.event(event)
print("New worker gets online")
print(event['hostname'],event['timestamp'],event['freq'],event['sw_ver'])
with app.connection() as connection:
recv = app.events.Receiver(connection,handlers={
'task-Failed': announce_Failed_tasks,'task-succeeded': announce_succeeded_tasks,'worker-online': worker_online_handler,'*': state.event,})
recv.capture(limit=None,timeout=None,wakeup=True)
if __name__ == '__main__':
# app = Celery('azima')
my_monitor(app)
开始使用 celery worker
celery -A azima.main worker -l INFO
然后开始monitor.py
python -m azima.monitor
但是只有 worker-online
事件被触发,而 task-succeeded
等其他事件没有被触发或处理。
我在这里遗漏了什么?
解决方法
使用 cli option task-
或 -E
启用工人 --task-events
组事件并尝试捕获所有事件:
def my_monitor(app):
def on_event(event):
print("Event.type",event.get('type'))
with app.connection() as connection:
recv = app.events.Receiver(connection,handlers={'*': on_event})
recv.capture(limit=None,timeout=None,wakeup=True)
,
默认情况下,Celery worker 不发送事件。但是,与大多数有用的功能一样,它可以通过在您的配置中启用 worker_send_task_events 或使用 -E
标志运行 Celery 工作器来进行配置。
try_interval = 1
while True:
try:
try_interval *= 2
with self.capp.connection() as conn:
recv = EventReceiver(conn,handlers={"*": self.on_event},app=self.capp)
try_interval = 1
logger.debug("Capturing events...")
recv.capture(limit=None,wakeup=True)
except (KeyboardInterrupt,SystemExit):
try:
import _thread as thread
except ImportError:
import thread
thread.interrupt_main()
except Exception as e:
logger.error("Failed to capture events: '%s',"
"trying again in %s seconds.",e,try_interval)
logger.debug(e,exc_info=True)
time.sleep(try_interval)
有两个区别:
- 您的
app
中缺少芹菜EventReceiver
。 - 无限循环 (
while True
) 虽然我假设capture
方法正在阻塞并等待事件,并且循环只是为了以防万一。