如何使用spring-kafka和kafka-streams在KStreams Bean中记录偏移量

问题描述

我已经通过处理器API的transform()或process()方法提到了关于在KStreams上记录偏移量的几乎所有问题,就像这里许多问题中提到的那样-

How can I get the offset value in KStream

但是我无法获得这些答案的解决方案,所以我在问这个问题。

每次流使用消息时,我想记录分区,消费者组ID和偏移量,我不知道如何将process()或transform()方法与ProcessorContext API集成在一起?而且,如果我要在CustomParser类中实现Processor接口,那么我将必须实现所有方法,但是我不确定那是否可以工作,就像在有关记录元数据的文档中提到的那样-https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-developer-guide-processor-api >

我已经在如下所示的spring-boot应用程序中设置了KStreams(供参考,请更改变量名称

 @Bean
    public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
        Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,jsonDeserializer);

        final KStream<String,JsonNode> pStream = myBuilder.stream(inputTopic,Consumed.with(Serdes.String(),jsonSerde));

        Properties props = streamsConfig.kStreamsConfigs().asProperties();
       

        pstream
                .map((key,value) -> {
                            try {
                                return CustomParser.parse(key,value);
                            } catch (Exception e) {
                                LOGGER.error("Error occurred - " + e.getMessage());
                            }
                            return new keyvalue<>(null,null);
                        }
                )
                .filter((key,value) -> {
                    try {
                        return MessageFilter.filterNonNull(key,value);
                    } catch (Exception e) {
                        LOGGER.error("Error occurred - " + e.getMessage());
                    }
                    return false;
                })
                .through(
                        outputTopic,Produced.with(Serdes.String(),new JsonPOJOSerde<>(TransformedMessage.class)));

        return Sets.newHashSet(
                new KafkaStreams(profileBuilder.build(),props)
        );
    }

解决方法

实施Transformer;将ProcessorContext中的init()保存下来;然后您可以访问transform()中的记录元数据,只需返回原始键/值即可。

这里是example of a Transformer。 Spring为Apache Kafka提供了此功能,以调用Spring Integration流来转换键/值。