问题描述
我目前正在 Kotlin 中开展一个项目,该项目使用带有反应器的 rabbit 来接收某种 DTO 类型的消息,并在它们达到特定标准时将它们发送出去。在测试我的代码的过程中,我尝试模拟错误的消息输入(因为消息来自外部服务)并查看订阅者的行为。一旦我收到一条错误的输入消息,订阅者就会停止收听任何新的输入并抛出以下异常:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,failedMessage=GenericMessage
然后我尝试从 spring 运行 official example 并更改供应商以在第一次分派时发送错误数据,然后发送有效数据并查看行为。
在供应商方面,我添加了一个索引器以仅在第一次运行时发送错误消息。
//Following source and sinks are used for testing only.
//Test source will send data to the same destination where the processor receives data
//Test sink will consume data from the same destination where the processor produces data
// ------ New Code -------
static int x = 0;
// ------ END New Code -------
static class TestSource {
private AtomicBoolean semaphore = new AtomicBoolean(true);
private Random random = new Random();
private int[] ids = new int[]{100100,100200,100300};
@Bean
public Supplier<?> sendTestData() {
return () -> {
// ------ New Code -------
if(x==0) {
return "hey";
}
x++;
// ------ END New Code -------
int id = ids[random.nextInt(3)];
int temperature = random.nextInt((102 - 65) + 1) + 65;
Sensor sensor = new Sensor();
sensor.setId(id);
sensor.setTemperature(temperature);
return sensor;
};
}
}
订阅方:
@Bean
public Function<Flux<Sensor>,Flux<Average>> calculateAverage() {
return data -> data.window(Duration.ofSeconds(3)).flatMap(
window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
}
private Mono<Average> calculateAverage(GroupedFlux<Integer,Sensor> group) {
return group
.reduce(new Accumulator(0,0),(a,d) -> new Accumulator(a.getCount() + 1,a.getTotalValue() + d.getTemperature()))
.map(accumulator -> new Average(group.key(),(accumulator.getTotalValue()) / accumulator.getCount()));
}
正如我怀疑的那样,结果表明订阅者在错误输入失败后无法继续处理下一条有效消息:
2021-02-21 17:46:57.905 INFO 30702 --- [lOCpiq_lPYTgA-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.calculateAverage-in-0' has 0 subscriber(s).
2021-02-21 17:46:57.907 ERROR 30702 --- [lOCpiq_lPYTgA-1] onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'calculateAverage'
2021-02-21 17:46:57.910 ERROR 30702 --- [lOCpiq_lPYTgA-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'hey': was expecting (JSON String,Number,Array,Object or token 'null','true' or 'false')
at [Source: (byte[])"hey"; line: 1,column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hey': was expecting (JSON String,column: 4]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:110) ~[spring-cloud-stream-3.0.11.RELEASE.jar:3.0.11.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:197) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fromMessage(SimpleFunctionRegistry.java:932) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputValueIfNecessary(SimpleFunctionRegistry.java:833) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertInputPublisherIfNecessary$9(SimpleFunctionRegistry.java:772) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-21 17:47:00.917 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,failedMessage=GenericMessage [payload=byte[3],headers={amqp_receivedDeliveryMode=PERSISTENT,amqp_receivedExchange=sensor,amqp_deliveryTag=4,deliveryAttempt=3,amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA,amqp_redelivered=false,amqp_receivedRoutingKey=sensor,amqp_timestamp=Sun Feb 21 17:46:57 IST 2021,amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9,id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2,amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg,sourceData=(Body:'hey' MessageProperties [headers={},timestamp=Sun Feb 21 17:46:57 IST 2021,messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9,contentType=application/json,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,redelivered=false,receivedExchange=sensor,receivedRoutingKey=sensor,deliveryTag=4,consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg,consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]),timestamp=1613922417903}],timestamp=1613922417903}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,timestamp=1613922417903}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
2021-02-21 17:47:03.507 INFO 30702 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-02-21 17:47:03.936 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,amqp_deliveryTag=5,amqp_timestamp=Sun Feb 21 17:46:58 IST 2021,amqp_messageId=b67652f5-1842-4c13-596d-295b36002217,id=4f2a90f9-5aae-0fae-507d-787694f605dc,timestamp=Sun Feb 21 17:46:58 IST 2021,messageId=b67652f5-1842-4c13-596d-295b36002217,deliveryTag=5,timestamp=1613922420930}],timestamp=1613922420930}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,timestamp=1613922420930}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
我的问题是如何处理“幕后”发生的异常,例如输入解析?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)