问题描述
我有一个 api 端点,它触发 celery 任务,完成后,它将结果写入 AsyncWebSocketConsumer 组(只有一个用户会在它上面)。这适用于所有成功的任务,但如果任务因异常而失败,它将第一次发送失败事件,并将通过 websocket 发送(正如预期的那样),但第二次将失败.
class ConversionConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope["user"]
self.awaiting_for_conversion = None
if self.user.is_anonymous:
await self.close()
return
await self.accept()
async def disconnect(self,close_code):
if self.awaiting_for_conversion:
await self.channel_layer.group_discard(
self.awaiting_for_conversion,self.channel_name
)
async def receive(self,text_data):
try:
action = json.loads(text_data)
# the user has to send a message asking for the conversion he wants to receive information on
if action['type'] == 'await_for_conversion':
self.awaiting_for_conversion = action['payload']['uid']
await self.channel_layer.group_add(
self.awaiting_for_conversion,self.channel_name
)
except:
pass
async def conversion_succeded(self,event):
uid = event['uid']
# Send message to WebSocket
await self.send(text_data=json.dumps({
'status': 'SUCCESS','type': 'conversion_has_finish','uid': uid
}))
async def conversion_Failed(self,event):
# This will be called the first time an exception is raised and caused the task to fail,but not if another task is called and fails the same way
uid = event['uid']
error = event['error']
# Send message to WebSocket
await self.send(text_data=json.dumps({
'status': 'FAILURE','uid': uid,'error': error
}))
这是我的芹菜任务(忽略导入)
class ConversionTask(Task):
def on_success(self,exc,task_id,args,kwargs):
message = {'type': 'conversion_succeded','uid': self.uid}
async_to_sync(self.channel_layer.group_send)(self.uid,message)
if isfile(f"./tmp/{self.uid}"):
remove(f"./tmp/{self.uid}")
def on_failure(self,kwargs,einfo):
error = str(exc) if exc else []
message = {'type': 'conversion_Failed','uid': self.uid,'error': error
}
async_to_sync(self.channel_layer.group_send)(self.uid,message)
# This is done successfully,both times
if isfile(f"./tmp/{self.uid}"):
remove(f"./tmp/{self.uid}")
@shared_task(bind=True,base=ConversionTask)
def convert_csv_file_task(self,uid,semantic_validation,conversion_id):
self.channel_layer = get_channel_layer()
self.uid = uid
self.conversion_id = conversion_id
# Do some work that may end up in an exception here...
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)