问题描述
我们在 Spring Cloud Stream 上有一个与 Project Reactor 集成的应用程序。我们通过在 Message 头中设置 spring.cloud.stream.sendto.destination 来动态设置目标主题并发布消息。 我们正在寻求处理错误情况,例如 kafka 服务器间歇性关闭或发布时主题不可用。 我们已经实现了@ServiceActivator 来处理所有错误。动态设置主题时,ServiceActivator 不会捕获生产者错误,只有消费者错误会命中 ServiceActivator。 如果我们提前设置 out.destination 主题,那么生产者错误也会被捕获。 有什么方法可以为动态目的地启用 errorChannel 吗? 下面的示例代码, 应用程序.yml
spring:
cloud:
function:
deFinition: generate_flux;process
stream:
kafka:
binder:
brokers: localhost:9094
required-acks: all
producer-properties:
retries: 2
bindings:
generate_flux-out-0:
destination: source
process-in-0:
destination: source
group: processors
process-out-0:
producer:
error-channel-enabled: true
@Bean
public Function<Flux<Message<String>>,Flux<Message<String>>> process() {
return (msg) -> {
return msg.flatMap(
message -> {
log.info("individual msg " + message);
Map<String,Object> headers = new HashMap<>();
headers.put("spring.cloud.stream.sendto.destination","processed");
MessageHeaders mh = new MessageHeaders(headers);
return Mono.just(MessageBuilder.createMessage("vvvv processed ",mh));
});
};
}
服务激活器:
@ServiceActivator(inputChannel = "errorChannel")
public void handle(final ErrorMessage em) {
log.error("Error caught" + em.toString());
}
启动 spring boot 应用程序,确保消息被消费和发布,然后删除主题“已处理”以创建失败场景。
有了以上, Producer 出现 TimeoutException,但 Service Activator errorChannel 未捕获:
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic processed not present in Metadata after 60000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@f258f76]; nested exception is org.springframework.kafka.KafkaException: Send Failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in Metadata after 60000 ms.,FailedMessage=Genericmessage [payload=byte[15],headers={id=13dc2799-45c5-0c10-2e62-4e296f62d1dc,spring.cloud.stream.sendto.destination=processed,contentType=application/json,timestamp=1620856125106}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
reactor.core.publisher.Flux.doOnNext
org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Error has been observed at the following site(s):
|_ Flux.doOnNext ⇢ at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Stack trace:
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.lambda$onMessage$0(retryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.kafka.KafkaException: Send Failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in Metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:389)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:517)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.lambda$onMessage$0(retryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
ERROR c.i.f.e.d.DedupeProcessorApplication - intuit_tid= Error caughtErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application-1.process-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[43],headers={kafka_offset=4,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f,deliveryAttempt=3,kafka_timestampType=CREATE_TIME,kafka_receivedPartitionId=0,kafka_receivedTopic=source,kafka_receivedTimestamp=1620856185102,kafka_groupId=processors}],headers={kafka_data=ConsumerRecord(topic = source,partition = 0,leaderEpoch = 0,offset = 4,CreateTime = 1620856185102,serialized key size = -1,serialized value size = 43,headers = RecordHeaders(headers = [RecordHeader(key = contentType,value = [97,112,108,105,99,97,116,111,110,47,106,115,110]),RecordHeader(key = spring_json_header_types,value = [123,34,101,84,121,58,118,46,103,83,114,125])],isReadOnly = false),key = null,value = [B@4cb51739),id=a2fef876-a0ab-c825-9ee6-50b355044e51,sourceData=ConsumerRecord(topic = source,timestamp=1620856206192}] for original Genericmessage [payload=byte[43],kafka_groupId=processors}]
// headers.put("spring.cloud.stream.sendto.destination","processed");
process-out-0:
destination: processed
producer:
error-channel-enabled: true
能够在 ServiceActivator 中获得生产者错误。在下面记录
Error caughtErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: nested exception is org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for processed-0:120000 ms has passed since batch creation,headers={contentType=application/json,id=0ba150e5-8e6f-81f1-b9c4-cda115171ce0,timestamp=1620856522352}] [record=ProducerRecord(topic=processed,partition=null,headers=RecordHeaders(headers = [RecordHeader(key = contentType,isReadOnly = true),key=null,value=[B@339a63df,timestamp=null)],headers={id=1acaece7-c596-c9ff-440b-37df824bd564,timestamp=1620856642357}]
我的问题是,当目标主题在消息的标头中动态设置且未配置时,我如何掌握错误?
解决方法
我可能应该写一篇我可以参考的博客,但简短的回答是:出于一个简单的原因,我们在 sc-stream 方面无能为力来处理由于响应式函数而产生的错误 - 我们绝对没有控制流中发生的事情。
响应式函数和命令式函数的根本区别在于,命令式函数充当消息侦听器,并由我们(框架)在每条消息上调用。相比之下,响应式函数仅在应用程序初始化期间被调用一次,我们将源流连接到用户函数公开的发布者,然后我们将完全控制权交给用户。
所以这是一个工作单元的问题,在命令式函数中,这样的单元是消息,而在响应式函数中,这样的单元是整个消息流,它可能是无限的。
这一切意味着,在编写反应式函数时,您必须依赖反应式 API 本身丰富的错误处理能力。