Spring Cloud Stream + Kafka 动态目标 + 错误处理

问题描述

我们在 Spring Cloud Stream 上有一个与 Project Reactor 集成的应用程序。我们通过在 Message 头中设置 spring.cloud.stream.sendto.destination 来动态设置目标主题并发布消息。 我们正在寻求处理错误情况,例如 kafka 服务器间歇性关闭或发布时主题不可用。 我们已经实现了@ServiceActivator 来处理所有错误。动态设置主题时,ServiceActivator 不会捕获生产者错误,只有消费者错误会命中 ServiceActivator。 如果我们提前设置 out.destination 主题,那么生产者错误也会被捕获。 有什么方法可以为动态目的地启用 errorChannel 吗? 下面的示例代码, 应用程序.yml

spring:
  cloud:
    function:
      deFinition: generate_flux;process
    stream:
      kafka:
        binder:
          brokers: localhost:9094
          required-acks: all
          producer-properties:
            retries: 2
      bindings:
        generate_flux-out-0:
          destination: source
        process-in-0:
          destination: source
          group: processors
        process-out-0:
          producer:
            error-channel-enabled: true

在应用程序类中,具有动态目标主题集的函数

@Bean
 public Function<Flux<Message<String>>,Flux<Message<String>>> process() {
     return (msg) -> {
       return msg.flatMap(
           message -> {
             log.info("individual msg " + message);
             Map<String,Object> headers = new HashMap<>();
             headers.put("spring.cloud.stream.sendto.destination","processed");
             MessageHeaders mh = new MessageHeaders(headers);
             return Mono.just(MessageBuilder.createMessage("vvvv processed ",mh));
           });
     };
 }

服务激活器:

 @ServiceActivator(inputChannel = "errorChannel")
  public void handle(final ErrorMessage em) {
    log.error("Error caught" + em.toString());
  }

启动 spring boot 应用程序,确保消息被消费和发布,然后删除主题“已处理”以创建失败场景。

有了以上, Producer 出现 TimeoutException,但 Service Activator errorChannel 未捕获:

   Caused by: org.apache.kafka.common.errors.TimeoutException: Topic processed not present in Metadata after 60000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@f258f76]; nested exception is org.springframework.kafka.KafkaException: Send Failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in Metadata after 60000 ms.,FailedMessage=Genericmessage [payload=byte[15],headers={id=13dc2799-45c5-0c10-2e62-4e296f62d1dc,spring.cloud.stream.sendto.destination=processed,contentType=application/json,timestamp=1620856125106}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
    reactor.core.publisher.Flux.doOnNext
    org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Error has been observed at the following site(s):
    |_ Flux.doOnNext ⇢ at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Stack trace:
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
        at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
        at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
        at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
        at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
        at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
        at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
        at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
        at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
        at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
        at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.lambda$onMessage$0(retryingMessageListenerAdapter.java:120)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
        at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:114)
        at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:40)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.kafka.KafkaException: Send Failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in Metadata after 60000 ms.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:389)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:517)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
    at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
    at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
    at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.lambda$onMessage$0(retryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)

但是服务激活器只显示与消费者相关的错误

 ERROR c.i.f.e.d.DedupeProcessorApplication - intuit_tid= Error caughtErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application-1.process-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[43],headers={kafka_offset=4,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f,deliveryAttempt=3,kafka_timestampType=CREATE_TIME,kafka_receivedPartitionId=0,kafka_receivedTopic=source,kafka_receivedTimestamp=1620856185102,kafka_groupId=processors}],headers={kafka_data=ConsumerRecord(topic = source,partition = 0,leaderEpoch = 0,offset = 4,CreateTime = 1620856185102,serialized key size = -1,serialized value size = 43,headers = RecordHeaders(headers = [RecordHeader(key = contentType,value = [97,112,108,105,99,97,116,111,110,47,106,115,110]),RecordHeader(key = spring_json_header_types,value = [123,34,101,84,121,58,118,46,103,83,114,125])],isReadOnly = false),key = null,value = [B@4cb51739),id=a2fef876-a0ab-c825-9ee6-50b355044e51,sourceData=ConsumerRecord(topic = source,timestamp=1620856206192}] for original Genericmessage [payload=byte[43],kafka_groupId=processors}]

只需将主题更改为非动态,即从功能删除以下行

// headers.put("spring.cloud.stream.sendto.destination","processed");

并将目标主题添加到配置中,

      process-out-0:
          destination: processed
          producer:
            error-channel-enabled: true

能够在 ServiceActivator 中获得生产者错误。在下面记录

Error caughtErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: nested exception is org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for processed-0:120000 ms has passed since batch creation,headers={contentType=application/json,id=0ba150e5-8e6f-81f1-b9c4-cda115171ce0,timestamp=1620856522352}] [record=ProducerRecord(topic=processed,partition=null,headers=RecordHeaders(headers = [RecordHeader(key = contentType,isReadOnly = true),key=null,value=[B@339a63df,timestamp=null)],headers={id=1acaece7-c596-c9ff-440b-37df824bd564,timestamp=1620856642357}]

我的问题是,当目标主题在消息的标头中动态设置且未配置时,我如何掌握错误

解决方法

我可能应该写一篇我可以参考的博客,但简短的回答是:出于一个简单的原因,我们在 sc-stream 方面无能为力来处理由于响应式函数而产生的错误 - 我们绝对没有控制流中发生的事情。

响应式函数和命令式函数的根本区别在于,命令式函数充当消息侦听器,并由我们(框架)在每条消息上调用。相比之下,响应式函数仅在应用程序初始化期间被调用一次,我们将源流连接到用户函数公开的发布者,然后我们将完全控制权交给用户。

所以这是一个工作单元的问题,在命令式函数中,这样的单元是消息,而在响应式函数中,这样的单元是整个消息流,它可能是无限的。

这一切意味着,在编写反应式函数时,您必须依赖反应式 API 本身丰富的错误处理能力。