问题描述
我正在使用 AWS CDK 创建一个状态机,该状态机将消息发送到 fifo 队列并等待来自 lambda 工作线程的回调以继续执行。
我希望发送到 fifo 队列的消息具有分配给它们的动态 MessageGroupId,以便我可以控制处理消息的 lambda 工作线程的数量。我能想到的拥有动态 MessageGroupId 的唯一方法是使用 JsonPath 在 step 函数输入上引用一些参数,但是我没有遇到任何关于它的文档。我使用 JsonPath 动态传递 MessageGroupId 的初始测试失败了,只需传递字符串“$.MessageGroupId”即可有效地为每条消息提供相同的消息组 ID,从而为一个 lambda 工作者提供一个。
- 是否可以在从步进函数发送时为 sqs 消息动态分配消息组 ID?
- 如果是,怎么做?
解决方法
在 AWS Support 的帮助下,我设法通过使用 Context Object 或从初始输入传递 ID 并使用 $
引用它来做到这一点。
这是一个例子:
{
"Comment": "Generate unique MessageGroupId","StartAt": "Start","States": {
"Start": {
"Type": "Task","TimeoutSeconds": 60,"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": {
"QueueUrl": "<YOUR_QUEUE_URL>","MessageBody": {
"Input.$": "$","TaskToken.$": "$$.Task.Token"
},"MessageGroupId.$": "$$.Execution.Id"
},"ResultPath": "$","End": true
}
}
}
我的问题是我试图MessageGroupId
像这样:
"MessageGroupId": "$$.Execution.Id"
我应该做的地方:
"MessageGroupId.$": "$$.Execution.Id"
附加 .$
将解析表达式 "$$.Execution.Id"
而不是字面上的字符串 "$$.Execution.Id"
。