在RabbitMQ中按ID对消息进行分组或批处理

问题描述

我已经在一个客户端上为多个消费者创建了队列。我以Pair<String,String>的形式发送消息-第一个字符串是ID,第二个是消息本身。我想按第一个字符串对消息进行分组。

@RabbitListener(queues = ["\${app.rabbitmq.queuename:test}"])
fun receiveBatch(messages: List<Pair<String,String>>) {
    val groupedMessages = messages.asSequence()
        .groupBy({ it.first },{ it.second })
        .mapValues { it.value.fold("") { acc,s -> groupFrameMessages(acc,s) } }
        .also { log.debug { "Grouped messages contains Ids: ${it.keys}}" } }
        .toList()

    groupMessages.forEach{ "Handle $it" }
}

为了处理并发性,每个消息ID我只需要一个使用者,因为要处理该消息的操作很繁琐,如果第二个使用者尝试连接,则会完全中断,从而导致数据丢失。当前的解决方案是lockMap:

val lockMap = ConcurrentHashMap<String,Lock>()

如果我将连接多个实例,它将无法正常工作。我必须做两件事-首先是实现分布式锁定,或者,更好的解决方案是将ID为1的消息的并发使用者限制为第二个;其次是通过ID将这些消息分组。我该怎么做?有什么办法可以通过该ID对邮件进行分组?有没有一种方法可以通过id创建动态队列?

解决方法

是;对于每个ID使用不同的队列,并且每个队列只有一个使用者,这将是最简单的。

您可以使用RabbitAdmin动态创建队列和绑定,并且可以根据需要将队列添加/删除到侦听器容器。最好为此使用DirectMessageListenerContainer-请参见Choosing a Container

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...