使用响应式消息传递时,未在 Quarkus 上传播 Opentracing 上下文

问题描述

我有两个微服务通过 Kafka 相互交互,即一个发布消息而另一个使用它们。发布者和消费者都在 Quarkus (1.12.0.Final) 上运行,并使用响应式消息传递和 Mutiny。

制作人:

package myproducer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;


@ApplicationScoped
public class Publisher {  
  @Channel("mytopic")
  @Inject
  public Emitter<MyAvro> myTopic;

  @Override
  public Uni<Void> publish(MyModel model) {
    MyAvro avro = Mymodelmapper.INSTANCE.modelToAvro(model);

    return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
                                                .addMetadata(toOutgoingKafkaRecordMetadata(avro))
                                                .withAck(() -> {
                                                     e.complete(null);
                                                     return CompletableFuture.completedFuture(null);
                                                })));
  }
}

消费者:

package myconsumer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class Consumer {

  @Incoming("mytopic")
  public Uni<Void> consume(IncomingKafkaRecord<String,MyAvro> message) {
    MyModel model = Mymodelmapper.INSTANCE.avroToModel(message.getPayload());

    return ...;
  }

}

依赖: 包括人工制品

  • quarkus-smallrye-reactive-messaging-kafka
  • quarkus-resteasy-mutiny
  • quarkus-smallrye-opentracing
  • 夸库斯兵变
  • opentracing-kafka-client

Quarkus 配置(application.properties): 其中包括

quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId},spanId=%X{spanId},sampled=%X{sampled} [%c{2.}] (%t) %s%e%n

mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor

使用此设置,根本不会记录 traceId 或 spanId(即使它们应该根据 Quarkus 的“使用 OpenTracing”指南)。添加@org.eclipse.microprofile.opentracing.Traced后才设置traceId和spanId,但在生产者和消费者上两者完全不相关。

我根据前面提到的 Quarkus 指南“使用 OpenTracing”检查了我的 opentracing 配置,但没有发现我这边配置错误提示。 在阅读了与 Mutiny 一起使用时依赖 ThreadLocals 的某些 Quarkus 扩展中的问题的讨论后,我将人工制品 quarkus-smallrye-context-propagation 添加到我的依赖项中,但无济于事。

我怀疑这个问题可能与 https://github.com/quarkusio/quarkus/issues/15182 有关,尽管它是关于反应式路由而不是反应式消息传递。

有什么想法吗?

解决方法

这个问题不好解决,先解释一下是怎么回事。

OpenTracing 有事务和跨度的概念。 Span 是执行块(方法、数据库调用、发送到 Kafka 主题),而事务是跨越多个组件(一组 Span)的分布式进程。

这里的问题是,每次创建跨度时,它都找不到任何 OpenTracing 事务,因此它会创建一个新事务。这就是为什么您的所有跨度都没有相互关联的原因。

在 OpenTracing 中,当您创建跨度时,您将根据跨度上下文创建它。每个 OpenTracing 集成都会基于扩展技术创建一个 span 上下文(我没有找到更好的术语),例如,HTTP span 上下文基于 HTTP 标头,而 Kafka span 上下文基于 Kafka标题。

因此,要关联两个跨度,您需要使用来自底层技术的一些上下文创建跨度上下文,提供正确的OpenTracing ID

例如,要关联两个 Kafka 跨度,您需要有一个 uber-trace-id 标头(这是 Jaeger 中 OpenTracing id 的默认名称)和跟踪标识符(格式参见 tracespan-identity这个标题)。

知道了这一点,有很多事情要做。

首先,您需要在 uber-trace-id 方法的传出消息中添加一个 @Traced Kafka 标头,以将方法的范围与在 Kafka 生产者拦截器中创建的范围相关联。

Tracer tracer = GlobalTracer.get(); // you can also inject it
JaegerSpanContext spanCtx = ((JaegerSpan)tracer.activeSpan()).context();
// uber-trace-id format: {trace-id}:{span-id}:{parent-span-id}:{flags}
//see https://www.jaegertracing.io/docs/1.21/client-libraries/#tracespan-identity
var uberTraceId = spanCtx.getTraceId() + ":" +
        Long.toHexString(spanCtx.getSpanId()) + ":" +
        Long.toHexString(spanCtx.getParentId()) + ":" +
        Integer.toHexString(spanCtx.getFlags());
headers.add("uber-trace-id",openTracingId.getBytes());

然后,您需要将您的 @Traced 方法与传入消息的跨度(如果有)相关联。为此,最简单的方法是添加一个 CDI 拦截器,该拦截器将尝试根据方法参数(它将搜索 @Traced 参数)为所有用 Message 注释的方法创建 span 上下文。为此,该拦截器需要在 OpenTracing 拦截器之前执行,并在拦截器上下文中设置 span 上下文。

这是我们的拦截器实现,您可以随意使用它或根据您的需要调整它。

public class KafkaRecordOpenTracingInterceptor {

    @AroundInvoke
    public Object propagateSpanCtx(InvocationContext ctx) throws Exception {
        for (int i = 0 ; i < ctx.getParameters().length ; i++) {
            Object parameter = ctx.getParameters()[i];

            if (parameter instanceof Message) {
                Message message = (Message) parameter;

                Headers headers = message.getMetadata(IncomingKafkaRecordMetadata.class)
                    .map(IncomingKafkaRecordMetadata::getHeaders)
                    .get();
                SpanContext spanContext = getSpanContext(headers);
                ctx.getContextData().put(OpenTracingInterceptor.SPAN_CONTEXT,spanContext);
            }
        }

        return ctx.proceed();
    }

    private SpanContext getSpanContext(Headers headers) {
        return TracingKafkaUtils.extractSpanContext(headers,GlobalTracer.get());
    }
}

此代码同时使用 Quarkus OpenTracing 扩展和 Kafka OpenTracing contrib 库。

由于添加了从当前 span 上下文创建的 OpenTracing Kafka Header 以及从传入消息的头创建上下文,因此传出消息的相关性在任何情况下都应该发生。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...