Spring Cloud Stream Reactive - 处理消息异常

问题描述

我目前正在 Kotlin 中开展一个项目,该项目使用带有反应器的 rabbit 来接收某种 DTO 类型的消息,并在它们达到特定标准时将它们发送出去。在测试我的代码的过程中,我尝试模拟错误的消息输入(因为消息来自外部服务)并查看订阅者的行为。一旦我收到一条错误的输入消息,订阅者就会停止收听任何新的输入并抛出以下异常:

 org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage

然后我尝试从 spring 运行 official example 并更改供应商以在第一次分派时发送错误数据,然后发送有效数据并查看行为。

在供应商方面,我添加一个索引器以仅在第一次运行时发送错误消息。

//Following source and sinks are used for testing only.
//Test source will send data to the same destination where the processor receives data
//Test sink will consume data from the same destination where the processor produces data
// ------ New Code -------
static int x = 0;
// ------ END New Code -------

static class TestSource {

    private AtomicBoolean semaphore = new AtomicBoolean(true);
    private Random random = new Random();
    private int[] ids = new int[]{100100,100200,100300};

    @Bean
    public supplier<?> sendTestData() {

        return () -> {
            // ------ New Code -------
            if(x==0) {
                return "hey";
            }
            x++;
            // ------ END New Code -------
            int id = ids[random.nextInt(3)];
            int temperature = random.nextInt((102 - 65) + 1) + 65;
            Sensor sensor = new Sensor();
            sensor.setId(id);
            sensor.setTemperature(temperature);
            return sensor;
        };
    }
}

订阅方:

@Bean
public Function<Flux<Sensor>,Flux<Average>> calculateAverage() {
    return data -> data.window(Duration.ofSeconds(3)).flatMap(
            window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
}

private Mono<Average> calculateAverage(GroupedFlux<Integer,Sensor> group) {
    return group
            .reduce(new Accumulator(0,0),(a,d) -> new Accumulator(a.getCount() + 1,a.getTotalValue() + d.getTemperature()))
            .map(accumulator -> new Average(group.key(),(accumulator.getTotalValue()) / accumulator.getCount()));
}

正如我怀疑的那样,结果表明订阅者在错误输入失败后无法继续处理下一条有效消息:

2021-02-21 17:46:57.905  INFO 30702 --- [lOCpiq_lPYTgA-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.calculateAverage-in-0' has 0 subscriber(s).
2021-02-21 17:46:57.907 ERROR 30702 --- [lOCpiq_lPYTgA-1] onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'calculateAverage'
2021-02-21 17:46:57.910 ERROR 30702 --- [lOCpiq_lPYTgA-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'hey': was expecting (JSON String,Number,Array,Object or token 'null','true' or 'false')
 at [Source: (byte[])"hey"; line: 1,column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hey': was expecting (JSON String,column: 4]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.cloud.stream.converter.ApplicationjsonMessageMarshallingConverter.convertFromInternal(ApplicationjsonMessageMarshallingConverter.java:110) ~[spring-cloud-stream-3.0.11.RELEASE.jar:3.0.11.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:197) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fromMessage(SimpleFunctionRegistry.java:932) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputValueIfNecessary(SimpleFunctionRegistry.java:833) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertInputPublisherIfNecessary$9(SimpleFunctionRegistry.java:772) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-21 17:47:00.917 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[3],headers={amqp_receivedDeliveryMode=PERSISTENT,amqp_receivedExchange=sensor,amqp_deliveryTag=4,deliveryAttempt=3,amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA,amqp_redelivered=false,amqp_receivedRoutingKey=sensor,amqp_timestamp=Sun Feb 21 17:46:57 IST 2021,amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9,id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2,amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg,sourceData=(Body:'hey' MessageProperties [headers={},timestamp=Sun Feb 21 17:46:57 IST 2021,messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9,contentType=application/json,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,redelivered=false,receivedExchange=sensor,receivedRoutingKey=sensor,deliveryTag=4,consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg,consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]),timestamp=1613922417903}],timestamp=1613922417903}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,timestamp=1613922417903}]
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 23 more

2021-02-21 17:47:03.507  INFO 30702 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-02-21 17:47:03.936 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,amqp_deliveryTag=5,amqp_timestamp=Sun Feb 21 17:46:58 IST 2021,amqp_messageId=b67652f5-1842-4c13-596d-295b36002217,id=4f2a90f9-5aae-0fae-507d-787694f605dc,timestamp=Sun Feb 21 17:46:58 IST 2021,messageId=b67652f5-1842-4c13-596d-295b36002217,deliveryTag=5,timestamp=1613922420930}],timestamp=1613922420930}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,timestamp=1613922420930}]
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 23 more

我的问题是如何处理“幕后”发生的异常,例如输入解析?

解决方法

鉴于反应式编程的性质,这是一个棘手的问题,因此我们可能需要将此讨论纳入一个问题,所以请随意to raise one,但这是我的看法。

响应式函数和命令式函数之间的根本区别在于工作单元的概念。对于命令式函数,工作单元是单个 Message,因此框架保持对流的持续控制,仅通过 Message 将其传递给函数。因此,您会期望并且理所当然地,无论错误发生在哪里,我们都会有一些用于错误处理的东西 - 而我们确实做到了。

有了响应式函数,世界完全改变了,因为工作单元是整个流,函数仅作为框架提供的流和用户定义的流操作之间的连接器。在这一点上,s-c-stream 无法控制用户做什么,所以我们的一般建议,特别是考虑到响应式 API 在错误处理方面的丰富性,是由用户自己处理。但是理解这不是因为我们不想,而是我们不能,因为我们在那个时候没有看到流。 您的问题确实相当独特,因为在执行您定义的步骤之前发生异常,特别是我们提供的类型转换。事实上,我们可以做一些事情来帮助解决这个问题,但我们仍在寻求就这些事情应该是什么达成共识,直到我们做到快速失败才是解决方案。您可以通过修复您的输入来克服它,因为它显然不是 JSON 和/或放松您的函数签名 Function<Flux<byte[]>,Flux<Average>> 并自行处理类型转换。

无论如何,正如您所看到的,我乐于接受建议,因此请随时提出、发布和提供您的意见。

,
@Bean
Function<Flux<?>,Flux<?>> myFunction() {
    return messageFlux
        .onErrorContinue(MessageConversionException.class,(e,obj) -> {log.warn(...)})
        ...
}

这以前对我有用。它将允许上游继续处理转换异常。

,

以下是我的看法以及用于处理所有这些问题的策略:

首先,我尝试将已知的错误输入重定向到错误主题(就像这里已经建议的那样):

private Flux<PointData> knownErrorsHandling(Flux<PointData> pointFlux) {
        return pointFlux.onErrorContinue(
            this::isKnownExceptionThatShouldBeRedirectedToErrorsChannel,o) -> streamBridge.send(appProperties.getErrorsBindingName(),o));
    }

如果未处理异常,订阅将被取消。这就是为什么我们会看到 Dispatcher has no subscribers 错误。

在取消订阅后不重新订阅的一个奇怪的副作用是,在多次尝试处理消息并抛出 Dispatcher has no subscribers 异常后确认 Kafka 消息,有效地失去了信息。

我们可以使用 Flux#retry() 变体,但这只会让订阅者重新订阅,而不会尝试重新处理相同的消息

我的做法是避免Reactor重试,如果发生不可恢复的异常就退出应用程序,依靠底层平台重启应用程序,希望它在回收后工作:

    @EventListener
    public void onApplicationStartedRegisterReactorHookToStopApplicationIfErrorDropped(ApplicationStartedEvent e) {
        Hooks.onErrorDropped(t -> {
            LOG.error("Unable to recover from exception.",t);
            e.getApplicationContext().close();
            System.exit(1);
        });
    }

我还关闭了自动提交偏移量:spring.cloud.stream.kafka.bindings.<binding>.consumer.autoCommitOffset: false,特别是当我依赖 Reactor 的 .bufferTimeout 时,我注意到它会触发每条消息的消息确认,即使批处理可能会失败。

总的来说,我觉得响应式方法不适用于 Spring Cloud Streams。至少现在不是。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...