Django 频道 + celery:处理程序在异常时被调用,第二次调用失败

问题描述

我有一个 api 端点,它触发 celery 任务,完成后,它将结果写入 AsyncWebSocketConsumer 组(只有一个用户会在它上面)。这适用于所有成功的任务,但如果任务因异常而失败,它将第一次发送失败事件,并将通过 websocket 发送(正如预期的那样),但第二次将失败.

class ConversionConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user = self.scope["user"]
        self.awaiting_for_conversion = None
        if self.user.is_anonymous:
            await self.close()
            return

        await self.accept()

    async def disconnect(self,close_code):
        if self.awaiting_for_conversion:
            await self.channel_layer.group_discard(
                self.awaiting_for_conversion,self.channel_name
            )

    async def receive(self,text_data):
        try:
            action = json.loads(text_data)
            # the user has to send a message asking for the conversion he wants to receive information on
            if action['type'] == 'await_for_conversion':
                self.awaiting_for_conversion = action['payload']['uid']
                await self.channel_layer.group_add(
                    self.awaiting_for_conversion,self.channel_name
                )
        except:
            pass

    async def conversion_succeded(self,event):
        uid = event['uid']
        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'status': 'SUCCESS','type': 'conversion_has_finish','uid': uid
        }))

    async def conversion_Failed(self,event):
        # This will be called the first time an exception is raised and caused the task to fail,but not if another task is called and fails the same way
        uid = event['uid']
        error = event['error']
        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'status': 'FAILURE','uid': uid,'error': error
        }))

这是我的芹菜任务(忽略导入)

class ConversionTask(Task):
    def on_success(self,exc,task_id,args,kwargs):
        message = {'type': 'conversion_succeded','uid': self.uid}
        async_to_sync(self.channel_layer.group_send)(self.uid,message)
        if isfile(f"./tmp/{self.uid}"):
            remove(f"./tmp/{self.uid}")

    def on_failure(self,kwargs,einfo):
        error = str(exc) if exc else []
        message = {'type': 'conversion_Failed','uid': self.uid,'error': error
                  }
        async_to_sync(self.channel_layer.group_send)(self.uid,message)
        # This is done successfully,both times
        if isfile(f"./tmp/{self.uid}"):
            remove(f"./tmp/{self.uid}")


@shared_task(bind=True,base=ConversionTask)
def convert_csv_file_task(self,uid,semantic_validation,conversion_id):
    self.channel_layer = get_channel_layer()
    self.uid = uid
    self.conversion_id = conversion_id

    # Do some work that may end up in an exception here...

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...