SQS 使用 groupId 和线程池处理策略?

问题描述

我正在设计 sqs FIFO 处理策略,因此尝试以正确的方式进行。面临几个基本问​​题。

例如我在队列中:

  • 100 条组 ID 为“group1_100”的消息
  • 5 条包含“group2_5”的消息
  • 1 条包含“group3_1”的消息
  • 1 条包含“group4_1”的消息

句柄示例:

我使用 longpooling 请求了 10 条消息:

1.1) 如果同一组的消息超过 10 条,我可以确定我会在检索前收到 10 条(最多请求的)消息。

1.2) 可能 sqs 从 group1_100 返回 7 条消息,从 group2_5 返回 2 条消息,从 group4_1 返回 1 条消息吗?

如果是:

1.2.1) 例如,处理 group1_100 的这 7 条消息将需要 10 秒。所以使用来自 group2_5group4_1 的线程池 3 消息已经并行处理 - 所以我对 sqs 说它们是这样处理的:

1.2.2) 向 sqs 发出另一个请求(不要等到来自 group1_100 的 7 条消息完成)。所以

1.2.2.1) 是否有可能,我可以从 group1_100 接收消息(应该在前 7 个之后处理 = 以正确的顺序)? (假设,不,因为 sqs 向我们保证它将以正确的顺序强烈检索)。我说得对吗?

如果 1.2 为“是”,我需要控制不同的“groupid”句柄以防止出现这种情况,当我的处理器仅适用于“group1_100”时,即使它们首先是从“组/时间”的角度来看,对?为了防止诸如“我们作为客户端将向您发送大量带有一个 groupId 的消息,您需要将所有句柄处理器交给我们”之类的理论攻击。

  1. 所以它可能发生在单线程处理的情况下,对吗?

  2. 一个策略能否解决这个问题?

请求获取 10 条消息 -> 将它们分成 n 个组(按 groupid 分组)并开始并行处理它们。当处理来自“组”的所有消息时-使“批处理”通知sqs关于完成->在并行中开始获取另一包要处理的消息(取决于/检查常量total_handling_messages_limit_at_time)并开始以相同的方式处理它们同步获取每秒/分钟设置太多次。

因此,它允许我们使用线程池在一个 worker 内拆分句柄,而不是与具有“一个 groupid”的“长时间运行”任务堆叠在一起。

大体上我是对的吗,伙计们?也许我在这里遗漏了什么?

解决方法

在这里做了一些调查,所以回答其中的大部分:

1.1) 如果有超过 10 条相同的消息,我可以确定我会在检索前收到 10 条(最多请求的)消息吗? 组。

不,队列可能返回 9。或 8。即使消息可检索,也比您请求的要少

1.2) sqs 可以回复我来自 group1_100 的 7 条消息、来自 group2_5 的 2 条消息和来自 group4_1 的 1 条消息吗?

绝对是的

1.2.1) 例如,处理 group1_100 的这 7 条消息将需要 10 秒。因此,使用来自 group2_5 和 group4_1 的线程池 3 消息已经并行处理 - 所以我对 sqs 说它们是这样处理的:

1.2.2) 向 sqs 发出另一个请求(不要等到来自 group1_100 的 7 条消息完成)。所以

1.2.2.1) 是否有可能,我可以从 group1_100 接收消息(应该在前 7 个之后处理 = 以正确的顺序)? (假设,不,因为 sqs 向我们保证它将以正确的顺序强烈检索)。我说得对吗?

假设是对的(所以答案是否定的)。如果您的处理程序至少从 group_1_100 检索到一条消息 - 没有处理程序不会从该组的队列消息中获取,而这一条消息将不会被处理(或者它由 sqs 本身的可见性超时返回),所以要小心具有可见性超时 - 如果您不能正确处理消息 - 组的所有消息可能会挂断超时时间。

如果 1.2 为“是”,我需要控制不同的“groupid”句柄以防止出现这种情况,当我的处理器仅适用于“group1_100”时,即使它们首先是从“组/时间”的角度来看,对?为了防止诸如“我们作为客户端将向您发送大量带有一个 groupId 的消息,您需要将所有句柄处理器交给我们”之类的理论攻击。

所以它可能发生在单线程处理的情况下,对吗?

一般来说:

  1. 您不需要控制它,因为所有具有相同 groupid 的消息将被“冻结”,而您在处理前 n 个来自它们的消息时,其他处理程序将从其他组获取消息。(请参阅上一个答案)
  2. 您绝对 100% 需要将其拆分为 线程n 个消费者,不要由一个消费者以同步方式处理。

是否可以通过下一个策略解决? :

好像是的。例外:

并且不要与具有“一个 groupid”的“长时间运行”任务堆叠在一起。

如上所述。

还想提供其他信息以供注意:

  1. 组中的消息总是按时间戳这个组内

  2. 您不能假设,将按正确的时间戳顺序接收组。例如,如果 group1 在 10:00 发布,group2 在 10:01 发布,group3 在 10:02 发布 - 不能保证您会收到 group1,然后是 group2,然后是 group3 - 它可能会从 group2 返回 5,从 group 3 返回 5例如:)此外,每个请求的组顺序也不严格(每个组中只有消息)

  3. 如果您在 group1 中有 100 条消息,并且只从中检索到一条消息 - 其他 99 条消息将等待您处理此消息。

此外,这里还有一些有用的信息:https://stackoverflow.com/a/31799733/1151741

ps。如果我哪里错了,请纠正我。

希望对大家有所帮助