Sping Boot Service按需使用kafka消息

问题描述

我要求在哪里需要一个Spring Boot Rest Service,客户端应用程序每30分钟将调用一次,服务将返回

  1. 基于查询参数中指定数量的最新消息数,例如http://messages.com/getNewMessages?number=10在这种情况下应返回10条消息

  2. 条消息的数量基于查询参数中指定的数量和偏移量,例如在这种情况下,http://messages.com/getSpecificMessages?number=5&start=123应该返回5条以偏移量123开始的消息。

我有一个简单的独立应用程序,它运行良好。这是我测试过的,可以为将其合并到服务中提供一些指导。

public static void main(String[] args) {
        // create kafka consumer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"my-first-consumer-group");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,args[0]);

        Consumer<String,String> consumer = new KafkaConsumer<>(properties);

        // subscribe to topic
        consumer.subscribe(Collections.singleton("test"));
        consumer.poll(0);

        //get to specific offset and get specified number of messages
        for (TopicPartition partition : consumer.assignment())
            consumer.seek(partition,args[1]);

        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(5000));

        System.out.println("Total Record Count ******* : " + records.count());      

        for (ConsumerRecord<String,String> record : records) {
            System.out.println("Message: " + record.value());
            System.out.println("Message offset: " + record.offset());           
            System.out.println("Message: " + record.timestamp());
            Date date = new Date(record.timestamp());
            Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss.SSS");
            System.out.println("Message date: " + format.format(date));
        }
        consumer.commitSync(); 

由于我的消费者将在Spring Boot Service中按需询问如何实现此目标。如果我将属性放在application.properties中,该在哪里指定这些属性,这些属性是在启动时注入的,但是如何在运行时控制MAX_POLL_RECORDS_CONFIG。任何帮助表示赞赏。

解决方法

MAX_POLL_RECORDS_CONFIG仅影响您的kafka客户端将记录返回到spring服务,它将永远不会减少消费者从kafka服务器轮询的字节

enter image description here

请参见上图,无论您的起始偏移量= 150还是190,kafka服务器都会从中(offset = 110,offset = 190)返回全部数据,kafka服务器甚至都不知道有多少记录返回给使用者,他只知道字节大小=(220-110)

所以我认为您可以自己控制记录号,当前它由kafka客户端jar控制,它们都占用了您的jvm本地内存

,

您的问题的答案为here,带有示例代码的答案为this answer。 两者都是由出色的加里·罗素(Gary Russell)撰写的,他是Spring Kafka的主要人物或主要人物之一。

TL; DR:

如果要在运行时任意倒退分区,请使 侦听器实现ConsumerSeekAware并获取对 ConsumerSeekCallback。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...