问题描述
我要求在哪里需要一个Spring Boot Rest Service,客户端应用程序每30分钟将调用一次,服务将返回
-
基于查询参数中指定数量的最新消息数,例如http://messages.com/getNewMessages?number=10在这种情况下应返回10条消息
-
条消息的数量基于查询参数中指定的数量和偏移量,例如在这种情况下,http://messages.com/getSpecificMessages?number=5&start=123应该返回5条以偏移量123开始的消息。
我有一个简单的独立应用程序,它运行良好。这是我测试过的,可以为将其合并到服务中提供一些指导。
public static void main(String[] args) {
// create kafka consumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"my-first-consumer-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,args[0]);
Consumer<String,String> consumer = new KafkaConsumer<>(properties);
// subscribe to topic
consumer.subscribe(Collections.singleton("test"));
consumer.poll(0);
//get to specific offset and get specified number of messages
for (TopicPartition partition : consumer.assignment())
consumer.seek(partition,args[1]);
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(5000));
System.out.println("Total Record Count ******* : " + records.count());
for (ConsumerRecord<String,String> record : records) {
System.out.println("Message: " + record.value());
System.out.println("Message offset: " + record.offset());
System.out.println("Message: " + record.timestamp());
Date date = new Date(record.timestamp());
Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss.SSS");
System.out.println("Message date: " + format.format(date));
}
consumer.commitSync();
由于我的消费者将在Spring Boot Service中按需询问如何实现此目标。如果我将属性放在application.properties中,该在哪里指定这些属性,这些属性是在启动时注入的,但是如何在运行时控制MAX_POLL_RECORDS_CONFIG。任何帮助表示赞赏。
解决方法
MAX_POLL_RECORDS_CONFIG仅影响您的kafka客户端将记录返回到spring服务,它将永远不会减少消费者从kafka服务器轮询的字节
请参见上图,无论您的起始偏移量= 150还是190,kafka服务器都会从中(offset = 110,offset = 190)返回全部数据,kafka服务器甚至都不知道有多少记录返回给使用者,他只知道字节大小=(220-110)
所以我认为您可以自己控制记录号,当前它由kafka客户端jar控制,它们都占用了您的jvm本地内存
,您的问题的答案为here,带有示例代码的答案为this answer。 两者都是由出色的加里·罗素(Gary Russell)撰写的,他是Spring Kafka的主要人物或主要人物之一。
TL; DR:
如果要在运行时任意倒退分区,请使 侦听器实现ConsumerSeekAware并获取对 ConsumerSeekCallback。