Django Celery 使用 JS 微服务创建任务

问题描述

您好,我有一个运行 celery 的 Django 应用程序,我正在尝试使用 JS 中的微服务将任务添加到队列中。当我将任务添加到 AWS SQS 时,虽然 Django 一切正常,但是当我使用 JS 时,我收到此错误

[2021-01-05 19:39:55,982: WARNING/MainProcess] Received and deleted unkNown message.  Wrong destination?!?

The full contents of the message body was: body: '{"expires":null,"utc":true,"args":[5456,2878],"chord":null,"callbacks":null,"errbacks":null,"taskset":null,"id":"1a361c85-2209-4ffa-95c2-ee2e4855155e","retries":0,"task":"config.celery.debug_task","timelimit":[null,null],"eta":null,"kwargs":{}}' (244b)
{content_type:None content_encoding:None
  delivery_info:{'sqs_message': {'MessageId': '7b7d2948-c069-4f8b-9fdc-9c068d52f463','ReceiptHandle': 'AQEBxhqW2sRWf+Z851fw7nqRX6MQFVcTfjH5xqiIgYIiMa3AN3R235VxhM8pM7mcByw3eOZ3Y7kH5oZ+noFVzfjsllgnoh8idB/V7WWY2urNHKJrQadRT5cf4NcUVkFmB8+d2rLiAXuuyqpGbEMvmx1Dn49/5C3Fx8Eq+eUyB1oeilIrCqfMvIkG/yX5TdedxM9B2VBThZ/XtHqrgYCkJvEt9ozssM0f+INRHUrpVQMYCmUX9aTWeWljrTOapMTg27M6aie6HaDQxLK0FJvZUNr2d0uJhZ4C2qRGWrSo2VpD7QK7pslltZ12PVHKPw9X+cBGdWwJrdh5I0fBITuoy+CUUnybDekz668jJnsf1gcmpx8cBoVrmlocPi753g2klGf++mbFeL7yjENzb1YqZrrfvg==','MD5OfBody': '9bb39da667d1e840f8532a74a8dcecaa','Body': 'eyJleHBpcmVzIjpudWxsLCJ1dGMiOnRydWUsImFyZ3MiOls1NDU2LDI4NzhdLCJjaG9yZCI6bnVsbCwiY2FsbGJhY2tzIjpudWxsLCJlcnJiYWNrcyI6bnVsbCwidGFza3NldCI6bnVsbCwiaWQiOiIxYTM2MWM4NS0yMjA5LTRmZmetoTVjMi1lZTJlNDg1NTE1NWUiLCJyZXRyaWVzIjowLCJ0YXNrIjoiY29uZmlnLmNlbGVyeS5kZWJ1Z190YXNrIiwidGltZWxpbWl0IjpbbnVsbCxudWxsXSwiZXRhIjpudWxsLCJrd2FyZ3MiOnt9fQ=='},'sqs_queue': 'SQS_QUEUE_URL_HERE'} headers={}}

我正在使用此代码发送消息:

let taskId = uuidv4();
let result = {
    "expires": null,"utc": true,"args": [5456,"chord": null,"callbacks": null,"errbacks": null,"taskset": null,"id": taskId,"retries": 0,"task": "config.celery.debug_task","timelimit": [null,"eta": null,"kwargs": {}
}

const client = new SQSClient({
    region: "eu-west-3",credentialDefaultProvider: myCredentialProvider
});

const send = new SendMessageCommand({
    // use wrangler secrets to provide this global variable
    QueueUrl: "SQS_QUEUE_URL_HERE",MessageBody: Buffer.from(JSON.stringify(result)).toString("base64")
});

let resultSQS = client.send(send);

我调试了 django 任务负载以复制它,所以我发送了它需要的相同数据,但收到此错误。有人知道我是否遗漏了什么吗?

谢谢

解决方法

我解决了问题,MessageBody 出错了。这就是解决方案

let taskId = uuidv4();
    let body = [[{"hello": "world from JS","success": true}],{},{"callbacks": null,"errbacks": null,"chain": null,"chord": null}]

    let properties = {
        "correlation_id": taskId,"reply_to": "5480744c-f3ca-3e3f-9403-445fa5b865e1","delivery_mode": 2,"delivery_info": {
            "exchange": "","routing_key": "default"
        },"priority": 0,"body_encoding": "base64","delivery_tag": uuidv4()
    }


    let headers = {
        "lang": "py","task": "config.celery.debug_task","id": taskId,"shadow": null,"eta": null,"expires": null,"group": null,"retries": 0,"timelimit": [null,null],"root_id": taskId,"parent_id": null,"argsrepr": "({'hello': 'wolrd','success': True},)","kwargsrepr": "{}","origin": "gen21089@Marcoss-MacBook-Pro.local"
    }


    let payload = {
        "body": Buffer.from(JSON.stringify(body)).toString("base64"),"content-encoding": "utf-8","content-type": "application/json","headers": headers,"properties": properties
    }


    let encodedPayload = Buffer.from(JSON.stringify(payload)).toString("base64")

    const client = new SQSClient({
        region: "eu-west-3",credentialDefaultProvider: myCredentialProvider
    });

    const sendData = new SendMessageCommand({
        // use wrangler secrets to provide this global variable
        QueueUrl: "URL",MessageBody: encodedPayload
    });

    let data = await client.send(sendData);

因此 base64 中的有效负载包含一个也在 base64 中的“主体”和 Celery 读取以定位任务的标头。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...