问题描述
我已经在一个客户端上为多个消费者创建了队列。我以Pair<String,String>
的形式发送消息-第一个字符串是ID,第二个是消息本身。我想按第一个字符串对消息进行分组。
@RabbitListener(queues = ["\${app.rabbitmq.queuename:test}"])
fun receiveBatch(messages: List<Pair<String,String>>) {
val groupedMessages = messages.asSequence()
.groupBy({ it.first },{ it.second })
.mapValues { it.value.fold("") { acc,s -> groupFrameMessages(acc,s) } }
.also { log.debug { "Grouped messages contains Ids: ${it.keys}}" } }
.toList()
groupMessages.forEach{ "Handle $it" }
}
为了处理并发性,每个消息ID我只需要一个使用者,因为要处理该消息的操作很繁琐,如果第二个使用者尝试连接,则会完全中断,从而导致数据丢失。当前的解决方案是lockMap:
val lockMap = ConcurrentHashMap<String,Lock>()
如果我将连接多个实例,它将无法正常工作。我必须做两件事-首先是实现分布式锁定,或者,更好的解决方案是将ID为1的消息的并发使用者限制为第二个;其次是通过ID将这些消息分组。我该怎么做?有什么办法可以通过该ID对邮件进行分组?有没有一种方法可以通过id创建动态队列?
解决方法
是;对于每个ID使用不同的队列,并且每个队列只有一个使用者,这将是最简单的。
您可以使用RabbitAdmin
动态创建队列和绑定,并且可以根据需要将队列添加/删除到侦听器容器。最好为此使用DirectMessageListenerContainer
-请参见Choosing a Container。