问题描述
参考链接(https://github.com/burkaa01/jaeger-tracing-kafka/tree/master/stream-app),创建启用了Jaeger的流管道。
这是一个springboot应用程序,但是bean配置在spring xml文件中定义。
作为流拓扑的一部分,在进行转换时,获取ProcessorContext.headers()时出现错误。如果禁用jaeger,则流管道有效。
如果在批注中定义了bean,它也可以工作。
JIRA推荐的https://issues.apache.org/jira/browse/KAFKA-4344
毫无头绪,这里是错误堆栈供参考
java.lang.IllegalStateException: This should not happen as headers() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.headers(AbstractProcessorContext.java:147)
at com.hcl.nervIO.core.tracer.JaegerFilter.transform(JaegerFilter.java:34)
at com.hcl.nervIO.core.tracer.JaegerFilter.transform(JaegerFilter.java:22)
at org.apache.kafka.streams.kstream.internals.TransformersupplierAdapter$1.transform(TransformersupplierAdapter.java:47)
at org.apache.kafka.streams.kstream.internals.TransformersupplierAdapter$1.transform(TransformersupplierAdapter.java:36)
at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)