问题描述
我已经通过处理器API的transform()或process()方法提到了关于在KStreams上记录偏移量的几乎所有问题,就像这里许多问题中提到的那样-
How can I get the offset value in KStream
但是我无法获得这些答案的解决方案,所以我在问这个问题。
每次流使用消息时,我想记录分区,消费者组ID和偏移量,我不知道如何将process()或transform()方法与ProcessorContext API集成在一起?而且,如果我要在CustomParser类中实现Processor接口,那么我将必须实现所有方法,但是我不确定那是否可以工作,就像在有关记录元数据的文档中提到的那样-https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api >
我已经在如下所示的spring-boot应用程序中设置了KStreams(供参考,请更改变量名称)
@Bean
public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,jsonDeserializer);
final KStream<String,JsonNode> pStream = myBuilder.stream(inputTopic,Consumed.with(Serdes.String(),jsonSerde));
Properties props = streamsConfig.kStreamsConfigs().asProperties();
pstream
.map((key,value) -> {
try {
return CustomParser.parse(key,value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return new keyvalue<>(null,null);
}
)
.filter((key,value) -> {
try {
return MessageFilter.filterNonNull(key,value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return false;
})
.through(
outputTopic,Produced.with(Serdes.String(),new JsonPOJOSerde<>(TransformedMessage.class)));
return Sets.newHashSet(
new KafkaStreams(profileBuilder.build(),props)
);
}
解决方法
实施Transformer
;将ProcessorContext
中的init()
保存下来;然后您可以访问transform()
中的记录元数据,只需返回原始键/值即可。
这里是example of a Transformer。 Spring为Apache Kafka提供了此功能,以调用Spring Integration流来转换键/值。