python celery 监控事件没有被触发

问题描述

我有以下项目目录:

azima:
    __init.py
    main.py
    tasks.py
    monitor.py

tasks.py

from .main import app

@app.task
def add(x,y):
    return x + y

@app.task
def mul(x,y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

main.py

from celery import Celery

app = Celery('azima',backend='redis://localhost:6379/0',broker='redis://localhost:6379/0',include=['azima.tasks'])

# Optional configuration,see the application user guide.
app.conf.update(
    result_expires=3600,)

if __name__ == '__main__':
    app.start()

监视器.py

from .main import app

def my_monitor(app):
    state = app.events.State()

    def announce_Failed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK Failed: {task.name}[{task.uuid}]')

    def announce_succeeded_tasks(event):
        print('task succeeded')
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')

    def worker_online_handler(event):
        state.event(event)
        print("New worker gets online")
        print(event['hostname'],event['timestamp'],event['freq'],event['sw_ver'])

    with app.connection() as connection:
        recv = app.events.Receiver(connection,handlers={
                'task-Failed': announce_Failed_tasks,'task-succeeded': announce_succeeded_tasks,'worker-online': worker_online_handler,'*': state.event,})
        recv.capture(limit=None,timeout=None,wakeup=True)

if __name__ == '__main__':
    # app = Celery('azima')
    my_monitor(app)

开始使用 celery worker

celery -A azima.main worker -l INFO

然后开始monitor.py

python -m azima.monitor

但是只有 worker-online 事件被触发,而 task-succeeded 等其他事件没有被触发或处理。

enter image description here

在这里遗漏了什么?

解决方法

使用 cli option task--E 启用工人 --task-events 组事件并尝试捕获所有事件:

def my_monitor(app):
    def on_event(event):
        print("Event.type",event.get('type'))

    with app.connection() as connection:
        recv = app.events.Receiver(connection,handlers={'*': on_event})
        recv.capture(limit=None,timeout=None,wakeup=True)
,

默认情况下,Celery worker 不发送事件。但是,与大多数有用的功能一样,它可以通过在您的配置中启用 worker_send_task_events 或使用 -E 标志运行 Celery 工作器来进行配置。

,

通过将您的代码与 flowercode 进行比较:

try_interval = 1
while True:
    try:
        try_interval *= 2

        with self.capp.connection() as conn:
            recv = EventReceiver(conn,handlers={"*": self.on_event},app=self.capp)
            try_interval = 1
            logger.debug("Capturing events...")
            recv.capture(limit=None,wakeup=True)
    except (KeyboardInterrupt,SystemExit):
        try:
            import _thread as thread
        except ImportError:
            import thread
        thread.interrupt_main()
    except Exception as e:
        logger.error("Failed to capture events: '%s',"
                        "trying again in %s seconds.",e,try_interval)
        logger.debug(e,exc_info=True)
        time.sleep(try_interval)

有两个区别:

  1. 您的 app 中缺少芹菜 EventReceiver
  2. 无限循环 (while True) 虽然我假设 capture 方法正在阻塞并等待事件,并且循环只是为了以防万一。