问题描述
我有一个简单的MQTT客户端,它通过IntegrationFlow
输出收到的消息:
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://test.mosquitto.org:1883" });
factory.setConnectionOptions(options);
return factory;
}
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
"myConsumer",mqttClientFactory(),"/test/#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ",received from MQTT")
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
我需要将所有收到的消息通过管道传送到Flux
中,以便进行进一步处理。
public Flux<String> mqttChannel() {
...
return mqttFlux;
}
我该怎么做? loggingHandler从IntegrationFlow接收所有消息。我的Flux不能以类似的方式获取它的输入-通过某种方式将其传递给IntegrationFlows处理函数吗?
尝试:根据Artem Bilans的建议,我现在正尝试使用toReactivePublisher
将入站IntegrationFlow
转换为Flux
。
public Flux<String> mqttChannel() {
Publisher<Message<Object>> flow = IntegrationFlows.from(mqttInbound())
.toReactivePublisher();
Flux<String> mqttFlux = Flux.from(flow)
.log()
.map(i -> "TESTING: Received a MQTT message");
return mqttFlux;
}
运行示例时,出现以下错误:
10:14:39.541 [MQTT Call: myConsumer] ERROR o.s.i.m.i.MqttPahoMessageDrivenChannelAdapter - Unhandled exception for GenericMessage [payload=OFF,26.70,65.00,663,-62,192.168.2.100,0.026,25,4,6,7,933,278,27,1,1580496218,730573600,1800000,1980000,10800000,11880000,headers={mqtt_receivedRetained=true,mqtt_id=0,mqtt_duplicate=false,id=3f7565aa-ff4f-c389-d8a9-712d4f06f1cb,mqtt_receivedTopic=/083B7036697886C41D2DF2FD919143EE/MasterBedroom/Sensor/,mqtt_receivedQos=0,timestamp=1602231279537}]
结论:第一条消息到达后,即被错误处理并引发异常。
解决方法
不清楚您希望通过“我的通量”实现什么以及外观如何,但是对于您当前的配置,有几种解决方案。
您可以使用已经是FluxMessageChannel
的{{1}},因此您可以简单地使用Publisher
和订阅者来使用上述Flux.from()
产生的数据。 / p>
另一种方法是在MqttPahoMessageDrivenChannelAdapter
上使用toReactivePublisher()
,以将整个流作为反应性IntegrationFlowBuilder
源公开。当然,在这种情况下,您不能使用Publsiher
,因为它是单向的,并使您的流恰好在此处结束。不过,您可以考虑使用LoggingHandler
运算符:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-log
通过log()
进行发布订阅的方式,因此您可以将其保存在这些日志的流中,也可以将其外部保存以进行FluxMessageChannel
的订阅。该频道的所有订户将收到相同的消息。