问题描述
以下是文档中的代码:
from celery.exceptions import SoftTimeLimitExceeded
@celery.task(soft_time_limit=15,time_limit=20)
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
问题是 celery 如何允许捕获函数内部的异常。
如果它执行 my_task
并引发 SoftTimeLimitExceeded
,这个异常是如何在函数内部传播的?
另外,为什么不能在函数内部捕获TimeLimitExceeded
?
谢谢。
解决方法
SoftTimeLimit 的存在正是出于这个原因——因此您可以捕获异常并处理它。如果达到限制,硬限制实际上会阻止任务运行。我认为这是故意(我会正确地添加)这样设计的,所以我们开发人员不会把事情搞砸。
以下是如何捕获 SoftTimeLimitExceeded 异常 (https://github.com/scoringengine/scoringengine/blob/master/scoring_engine/engine/execute_command.py) 的示例:
from scoring_engine.celery_app import celery_app
from celery.exceptions import SoftTimeLimitExceeded
import subprocess
from scoring_engine.logger import logger
@celery_app.task(name='execute_command',acks_late=True,reject_on_worker_lost=True,soft_time_limit=30)
def execute_command(job):
output = ""
# Disable duplicate celery log messages
if logger.propagate:
logger.propagate = False
logger.info("Running cmd for " + str(job))
try:
cmd_result = subprocess.run(
job['command'],shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT
)
output = cmd_result.stdout.decode("utf-8")
job['errored_out'] = False
except SoftTimeLimitExceeded:
job['errored_out'] = True
job['output'] = output
return job