Sleuth 跟踪不适用于事务性 Kafka 生产者

问题描述

目前,我们正在使用事务性 Kafka 生产者。我们注意到的是 Kafka 的跟踪方面丢失了,这意味着我们无法看到 Kafka 生产者的检测,从而丢失了 b3 标头。

在查看代码后,我们发现事务生产者没有调用后处理器,这意味着 TracingProducer 永远不会由 TraceProducerPostProcessor 创建。有什么原因吗?此外,为事务生产者启用跟踪的工作是什么?似乎没有一个地方可以轻松创建跟踪生产者(DefaultKafkaProducerFactory #doCreateTxProducer 是私有的)

附加屏幕截图(DefaultKafkaProducerFactory 类)。在屏幕截图中,您可以看到仅对原始生产者调用后处理器,而不是事务性生产者的情况。

非常感谢您的帮助。

谢谢

DefaultKafkaProducerFactory#createRawProducer

解决方法

??

createRawProducer() 为事务性和非事务性生产者调用:

enter image description here

正在发生其他事情。

编辑

问题是侦探用不同的人替换了生产者,但工厂丢弃了它并使用了原始的。

https://github.com/spring-projects/spring-kafka/issues/1778

EDIT2

实际上,我们在这里丢弃跟踪生产者是一件好事; Sleuth 还将工厂包装在代理中,并将 CloseSafeProducer 包装在 TracingProducer 中;但我看到交易和非交易生产者的结果相同......

@SpringBootApplication
public class So67194702Application {

    public static void main(String[] args) {
        SpringApplication.run(So67194702Application.class,args);
    }

    @Bean
    public ApplicationRunner runner(ProducerFactory<String,String> pf) {
        return args -> {
            Producer<String,String> prod = pf.createProducer();
            prod.close();
        };
    }

}

close()... 上放置断点

enter image description here

,

感谢 Gary Russell 的快速回复。事务性和非事务性消费者都有效地调用了 createRawConsumer。

Sleuth 正在使用 TraceConsumerPostProcessor 将 Kafka 消费者包装到 TracingConsumer 中。由于 ProducerPostProcessor 接口扩展了 Function 接口,我们可能假设可以/应该使用函数的结果,但是 DefaultKafkaProducerFactory 的 createRawConsumer 方法正在应用后处理器而不使用返回类型。导致此特定情况下的问题。

所以,我们不能修改 createRawConsumer 的实现来分配后处理器的结果。如果没有,让后处理器扩展消费者而不是功能不是更好吗?

通过如下覆盖 createRawConsumer 方法进行的成功测试

    @Override
    protected Producer<K,V> createRawProducer(Map<String,Object> rawConfigs) {
        Producer<K,V> kafkaProducer = new KafkaProducer<>(rawConfigs,getKeySerializerSupplier().get(),getValueSerializerSupplier().get());
        for (ProducerPostProcessor<K,V> pp : getPostProcessors()) {
            kafkaProducer = pp.apply(kafkaProducer);
        }
        return kafkaProducer;
    }

感谢您的帮助。