如何使用HiveMQ mqtt客户端并行启用/处理订阅的主题/消息

问题描述

我们当前正在从Eclipe Paho MQTT客户端的旧版本切换到HiveMQ MQTT客户端的1.2版。 https://github.com/hivemq/hivemq-mqtt-client

当前正在使用需要使用Consumer函数作为回调的客户端的Aync-版本。

我们的MQTT客户端应用程序之一必须处理/使用许多有关许多不同主题的消息,并且处理一条消息不必等待前一个消息完成。 我们不确定什么是仅使用一个客户端实例实现并行处理消息的最佳方法

在上面的文档中,可以定义一个可选的执行器

client.subscribeWith()
    .topicFilter("test/topic")
    .qos(MqttQos.EXACTLY_ONCE)
    .callback(System.out::println)
    .executor(executor) // optional
    .send();

未定义执行程序时,Asyncclient应该如何表现? 然后,所有内容都以阻塞方式串行处理? 似乎以某种方式破坏了使用回调定义异步的目的。

在旧的实现中,我们使用共享订阅(这是HiveMQ 3的非标准功能,现在是MQTT 5的标准功能),并且客户端的多个实例一直在等待相同的实例主题以交替处理它们。

但是,考虑到HiveMQ CLient API(不幸的是,它缺少一些解释或示例),我们希望完善一种更优雅,更简单的方法,以实现与或线程池的并行处理!

任何帮助表示赞赏!

解决方法

通常,仅在将应用程序扩展到多台计算机时才需要共享订阅。 如果您可以并行处理消息,则没有理由在单台计算机上使用共享订阅。 如果将来邮件的负载会增加,您仍然可以选择共享订阅以在以后扩展到多台计算机。

由于MQTT提供了排序保证,因此HiveMQ MQTT客户端会串行调用同一回调。并行执行针对不同订阅的多个回调。 对于单个回调,只有您的应用程序可以选择分解顺序。 为此,您只需将消息从回调传递给并行工作器即可。