Java:MQTT MessageProducer对Flux的支持

问题描述

我有一个简单的MQTT客户端,它通过IntegrationFlow输出收到的消息:

public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] { "tcp://test.mosquitto.org:1883" });
    factory.setConnectionOptions(options);
    return factory;
}

public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            "myConsumer",mqttClientFactory(),"/test/#");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p + ",received from MQTT")
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("siSample");
    return loggingHandler;
}

我需要将所有收到的消息通过管道传送到Flux中,以便进行进一步处理。

public Flux<String> mqttChannel() {
    ...
    return mqttFlux;
}

我该怎么做? loggingHandler从IntegrationFlow接收所有消息。我的Flux不能以类似的方式获取它的输入-通过某种方式将其传递给IntegrationFlows处理函数吗?

MQTT示例代码取自https://github.com/spring-projects/spring-integration-samples/blob/master/basic/mqtt/src/main/java/org/springframework/integration/samples/mqtt/Application.java

尝试:根据Artem Bilans的建议,我现在正尝试使用toReactivePublisher将入站IntegrationFlow转换为Flux

public Flux<String> mqttChannel() {
    Publisher<Message<Object>> flow = IntegrationFlows.from(mqttInbound())
            .toReactivePublisher();
    Flux<String> mqttFlux = Flux.from(flow)
            .log()
            .map(i -> "TESTING: Received a MQTT message");
    return mqttFlux;
}

运行示例时,出现以下错误:

10:14:39.541 [MQTT Call: myConsumer] ERROR o.s.i.m.i.MqttPahoMessageDrivenChannelAdapter - Unhandled exception for GenericMessage [payload=OFF,26.70,65.00,663,-62,192.168.2.100,0.026,25,4,6,7,933,278,27,1,1580496218,730573600,1800000,1980000,10800000,11880000,headers={mqtt_receivedRetained=true,mqtt_id=0,mqtt_duplicate=false,id=3f7565aa-ff4f-c389-d8a9-712d4f06f1cb,mqtt_receivedTopic=/083B7036697886C41D2DF2FD919143EE/MasterBedroom/Sensor/,mqtt_receivedQos=0,timestamp=1602231279537}]

结论:第一条消息到达后,即被错误处理并引发异常。

解决方法

请阅读此文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/reactive-streams.html#reactive-streams

不清楚您希望通过“我的通量”实现什么以及外观如何,但是对于您当前的配置,有几种解决方案。

您可以使用已经是FluxMessageChannel的{​​{1}},因此您可以简单地使用Publisher和订阅者来使用上述Flux.from()产生的数据。 / p>

另一种方法是在MqttPahoMessageDrivenChannelAdapter上使用toReactivePublisher(),以将整个流作为反应性IntegrationFlowBuilder源公开。当然,在这种情况下,您不能使用Publsiher,因为它是单向的,并使您的流恰好在此处结束。不过,您可以考虑使用LoggingHandler运算符:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-log

通过log()进行发布订阅的方式,因此您可以将其保存在这些日志的流中,也可以将其外部保存以进行FluxMessageChannel的订阅。该频道的所有订户将收到相同的消息。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...