按ID将RabbitMQ消息分组以同步处理它们

问题描述

我正在开发具有两个应用程序的解决方案:仅一个发布者和仅onde消费者。该客户需要处理以下消息:

2020-09-30T19:22:41 MSG1) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:22:42 MSG2) {id:'user2.345',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG3) {id:'user3.877',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG4) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG5) {id:'user1.123',msg: '{ANYJSON}'}

每条消息需要处理5-10秒(数据库查询,4个HTTP请求,数据库插入e等),我为每个消息创建一个池(https://threads.js.org/usage-pool),因为它需要重复命令(数据库查询,4个http请求,数据库插入e等)供许多用户使用。这样每个POOL / MSG可以同时运行。

这些IDS由发布者动态生成,并且每次都会更改。基本上,我需要按ID对消息进行“分组”,并且仅当没有消息正在同时处理此ID时才处理消息。例如:

  • MSG 1(2020-09-30T19:22:41)应该立即处理
  • MSG 2(2020-09-30T19:22:42)应该立即处理
  • MSG 3(2020-09-30T19:22:43)应该立即处理
  • MSG 4(2020-09-30T19:22:43)应该等待MSG 1被处理
  • MSG 5(2020-09-30T19:22:43)应该等待MSG 4被处理

假设消费者1分钟后收到:

2020-09-30T19:23:41 MSG6) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:23:42 MSG7) {id:'user2.345',msg: '{ANYJSON}'}
  • MSG 6(2020-09-30T19:23:41)应该立即处理,因为MSG5已被处理。
  • MSG 7(2020-09-30T19:23:41)应该立即处理,因为MSG2已被处理。

enter image description here

在366行,我知道一个池何时完成,因此是时候向另一个MSG询问相同的ID。

那么,如何实现这个按ID“分组”的队列?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)