在RabbitMQ中按ID对消息进行分组或批处理

问题描述

我已经在一个客户端上为多个消费者创建了队列。我以Pair<String,String>的形式发送消息-第一个字符串是ID,第二个是消息本身。我想按第一个字符串对消息进行分组。

@RabbitListener(queues = ["\${app.rabbitmq.queuename:test}"])
fun receiveBatch(messages: List<Pair<String,String>>) {
    val groupedMessages = messages.asSequence()
        .groupBy({ it.first },{ it.second })
        .mapValues { it.value.fold("") { acc,s -> groupFrameMessages(acc,s) } }
        .also { log.debug { "Grouped messages contains Ids: ${it.keys}}" } }
        .toList()

    groupMessages.forEach{ "Handle $it" }
}

为了处理并发性,每个消息ID我只需要一个使用者,因为要处理该消息的操作很繁琐,如果第二个使用者尝试连接,则会完全中断,从而导致数据丢失。当前的解决方案是lockMap:

val lockMap = ConcurrentHashMap<String,Lock>()

如果我将连接多个实例,它将无法正常工作。我必须做两件事-首先是实现分布式锁定,或者,更好的解决方案是将ID为1的消息的并发使用者限制为第二个;其次是通过ID将这些消息分组。我该怎么做?有什么办法可以通过该ID对邮件进行分组?有没有一种方法可以通过id创建动态队列?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)