问题描述
我有两个微服务通过 Kafka 相互交互,即一个发布消息而另一个使用它们。发布者和消费者都在 Quarkus (1.12.0.Final) 上运行,并使用响应式消息传递和 Mutiny。
制作人:
package myproducer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
@ApplicationScoped
public class Publisher {
@Channel("mytopic")
@Inject
public Emitter<MyAvro> myTopic;
@Override
public Uni<Void> publish(MyModel model) {
MyAvro avro = Mymodelmapper.INSTANCE.modelToAvro(model);
return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
.addMetadata(toOutgoingKafkaRecordMetadata(avro))
.withAck(() -> {
e.complete(null);
return CompletableFuture.completedFuture(null);
})));
}
}
消费者:
package myconsumer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class Consumer {
@Incoming("mytopic")
public Uni<Void> consume(IncomingKafkaRecord<String,MyAvro> message) {
MyModel model = Mymodelmapper.INSTANCE.avroToModel(message.getPayload());
return ...;
}
}
依赖: 包括人工制品
- quarkus-smallrye-reactive-messaging-kafka
- quarkus-resteasy-mutiny
- quarkus-smallrye-opentracing
- 夸库斯兵变
- opentracing-kafka-client
Quarkus 配置(application.properties): 其中包括
quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId},spanId=%X{spanId},sampled=%X{sampled} [%c{2.}] (%t) %s%e%n
mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
使用此设置,根本不会记录 traceId 或 spanId(即使它们应该根据 Quarkus 的“使用 OpenTracing”指南)。添加@org.eclipse.microprofile.opentracing.Traced后才设置traceId和spanId,但在生产者和消费者上两者完全不相关。
我根据前面提到的 Quarkus 指南“使用 OpenTracing”检查了我的 opentracing 配置,但没有发现我这边配置错误的提示。 在阅读了与 Mutiny 一起使用时依赖 ThreadLocals 的某些 Quarkus 扩展中的问题的讨论后,我将人工制品 quarkus-smallrye-context-propagation 添加到我的依赖项中,但无济于事。
我怀疑这个问题可能与 https://github.com/quarkusio/quarkus/issues/15182 有关,尽管它是关于反应式路由而不是反应式消息传递。
有什么想法吗?
解决方法
这个问题不好解决,先解释一下是怎么回事。
OpenTracing 有事务和跨度的概念。 Span 是执行块(方法、数据库调用、发送到 Kafka 主题),而事务是跨越多个组件(一组 Span)的分布式进程。
这里的问题是,每次创建跨度时,它都找不到任何 OpenTracing 事务,因此它会创建一个新事务。这就是为什么您的所有跨度都没有相互关联的原因。
在 OpenTracing 中,当您创建跨度时,您将根据跨度上下文创建它。每个 OpenTracing 集成都会基于扩展技术创建一个 span 上下文(我没有找到更好的术语),例如,HTTP span 上下文基于 HTTP 标头,而 Kafka span 上下文基于 Kafka标题。
因此,要关联两个跨度,您需要使用来自底层技术的一些上下文创建跨度上下文,提供正确的OpenTracing ID。
例如,要关联两个 Kafka 跨度,您需要有一个 uber-trace-id
标头(这是 Jaeger 中 OpenTracing id 的默认名称)和跟踪标识符(格式参见 tracespan-identity这个标题)。
知道了这一点,有很多事情要做。
首先,您需要在 uber-trace-id
方法的传出消息中添加一个 @Traced
Kafka 标头,以将方法的范围与在 Kafka 生产者拦截器中创建的范围相关联。
Tracer tracer = GlobalTracer.get(); // you can also inject it
JaegerSpanContext spanCtx = ((JaegerSpan)tracer.activeSpan()).context();
// uber-trace-id format: {trace-id}:{span-id}:{parent-span-id}:{flags}
//see https://www.jaegertracing.io/docs/1.21/client-libraries/#tracespan-identity
var uberTraceId = spanCtx.getTraceId() + ":" +
Long.toHexString(spanCtx.getSpanId()) + ":" +
Long.toHexString(spanCtx.getParentId()) + ":" +
Integer.toHexString(spanCtx.getFlags());
headers.add("uber-trace-id",openTracingId.getBytes());
然后,您需要将您的 @Traced
方法与传入消息的跨度(如果有)相关联。为此,最简单的方法是添加一个 CDI 拦截器,该拦截器将尝试根据方法参数(它将搜索 @Traced
参数)为所有用 Message
注释的方法创建 span 上下文。为此,该拦截器需要在 OpenTracing 拦截器之前执行,并在拦截器上下文中设置 span 上下文。
这是我们的拦截器实现,您可以随意使用它或根据您的需要调整它。
public class KafkaRecordOpenTracingInterceptor {
@AroundInvoke
public Object propagateSpanCtx(InvocationContext ctx) throws Exception {
for (int i = 0 ; i < ctx.getParameters().length ; i++) {
Object parameter = ctx.getParameters()[i];
if (parameter instanceof Message) {
Message message = (Message) parameter;
Headers headers = message.getMetadata(IncomingKafkaRecordMetadata.class)
.map(IncomingKafkaRecordMetadata::getHeaders)
.get();
SpanContext spanContext = getSpanContext(headers);
ctx.getContextData().put(OpenTracingInterceptor.SPAN_CONTEXT,spanContext);
}
}
return ctx.proceed();
}
private SpanContext getSpanContext(Headers headers) {
return TracingKafkaUtils.extractSpanContext(headers,GlobalTracer.get());
}
}
此代码同时使用 Quarkus OpenTracing 扩展和 Kafka OpenTracing contrib 库。
由于添加了从当前 span 上下文创建的 OpenTracing Kafka Header 以及从传入消息的头创建上下文,因此传出消息的相关性在任何情况下都应该发生。