当 Spring Sleuth 在类路径中时,为什么跟踪信息不会通过 kafka 消息传播?

问题描述

由于未触发 SleuthKafkaAspect.wrapProducerFactory() 方法,因此跟踪信息不会通过 kafka 消息传播。 在生产者端,消息被正确发送并且跟踪信息被正确记录。在消费者方面,而是创建了一个新的 traceId 和 spanId。

以下两个日志记录行显示了 traceId、spanId(和 parentId)的不同值:

2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO  my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'

首先,使用 Krafdrop 和调试,我验证消息头不包含任何跟踪信息。

在那之后,我发现方法 SleuthKafkaAspect.wrapProducerFactory() 永远不会被触发,而是在消费者方面 SleuthKafkaAspect.anyConsumerFactory() 方法是。

使用的库版本如下:

  • 弹簧靴:2.3.7.RELEASE
  • spring cloud bom:Hoxton.SR10
  • spring cloud:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • 春季卡夫卡:2.5.10.RELEASE
  • kakfa 客户端:2.4.1
  • spring-cloud-starter-sleuth:2.2.7.RELEASE
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE

kakfa 客户端库版本为 2.4.1 是由于与 2.5.1 版 kafka 客户端上的生产错误相关的版本降级导致 cpu 使用率增加。 我也尝试使用以下库版本组合但没有成功:

  • 弹簧靴:2.3.7.RELEASE
  • spring cloud bom:Hoxton.SR10(和 Hoxton.SR8)
  • spring cloud:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • 春季卡夫卡:2.5.10.RELEASE
  • kakfa 客户端:2.5.1
  • spring-cloud-starter-sleuth:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • 弹簧靴:2.3.7.RELEASE
  • spring cloud bom:Hoxton.SR10(和 Hoxton.SR8)
  • spring cloud:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • 春季卡夫卡:2.5.10.RELEASE
  • kakfa 客户端:2.6.0
  • spring-cloud-starter-sleuth:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • 弹簧靴:2.3.7.RELEASE
  • spring cloud bom:Hoxton.SR10(和 Hoxton.SR8)
  • spring cloud:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • 弹簧卡夫卡:2.6.x
  • kakfa 客户端:2.6.0
  • spring-cloud-starter-sleuth:2.2.7.RELEASE(和 2.2.5.RELEASE)
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE(和 2.2.5.RELEASE)

我们将项目迁移到了不同​​的 Spring Boot 版本,从 2.3.0.RELEASE 到 2.3.7.RELEASE。在一切正常工作之前。 在旧库版本下方:

  • spring-boot: 2.3.0.RELEASE
  • spring-kafka: 2.5.0.RELEASE
  • kafka 客户端:2.4.1
  • spring-cloud:2.2.5.RELEASE
  • spring-cloud-starter-sleuth:2.2.5.RELEASE
  • spring-cloud-sleuth-zipkin:2.2.5.RELEASE

我们还引入了 log42/log4j(之前是带有 logback 的 slf4j)。

在相关库下面:

- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test

配置的属性如下:

spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

ProducerFactory 创建的配置类如下:


@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {

    KafkaProperties kafkaProperties;

    @Autowired
    public KafkaProducerConfig(
            KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate() {
        KafkaTemplate<String,Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }


    private ProducerFactory<String,Object> producerFactory() {
        DefaultKafkaProducerFactory<String,Object> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfigs());
        //defaultKafkaProducerFactory.transactionCapable();
        //defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
        return defaultKafkaProducerFactory;
    }

    private Map<String,Object> producerConfigs() {

        Map<String,Object> configs = kafkaProperties.buildProducerProperties();
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,JsonSerializer.class);
        return configs;
    }

}

我的spring boot应用类:


@Profile("DEV")
@SpringBootApplication(
        scanBasePackages = {"my.company"},exclude = {
                DataSourceAutoConfiguration.class,DataSourceTransactionManagerAutoConfiguration.class,HibernateJpaAutoConfiguration.class
        }
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common","my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
        "my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGnorE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletinitializer {

    public static void main(String[] args) {
        SpringApplication.run(DevAppStartup.class,args);
    }

}

在这里你可以找到命令“mvn dependency:tree”的输出 mvn_dependency_tree.txt

解决方法

As the documentation suggests,如果你想使用自己的ProducerFactory,你需要创建一个KafkaTemplate bean:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String,Object>producerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate(ProducerFactory<String,Object>producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}