如何正确寻找Kafka中的最后一个偏移量?

问题描述

我正在尝试从上次偏移读取 Kafka 主题,但无法正确读取:

属性文件中我设置了这些:

group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=最早的
isolation.level=read_committed

然后是代码

consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());

即使我在主题中有一些项目,我也可以看到如何从 0 打印偏移量。

在日志文件中我可以看到这个(缩短):

[Consumer clientId=my_client,groupId=my_group] 分配完成 对于第 1 代的组: {my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=分配(partitions=[mytopic-0])}

[Consumer clientId=my_client,groupId=my_group] 加入成功 第 1 代 [Consumer clientId=my_client,groupId=my_group] 通知转让人有关新的 分配(partitions=[mytopic-0])

[Consumer clientId=my_client,groupId=my_group] 添加新分配的 分区:mytopic-0

[Consumer clientId=my_client,groupId=my_group] 发现没有提交 分区 mytopic-0 的偏移量

[Consumer clientId=my_client,> groupId=my_group] 重置偏移量 将 mytopic-0 分区到偏移量 0。

知道我做错了什么吗?我用 seekToBeginning() + poll() + commitSync() + seekToEnd() 尝试了一些魔法,它以某种方式起作用,但我认为认情况下这应该起作用。

解决方法

auto.offset.reset=latest会解决你的问题

https://docs.confluent.io/platform/current/clients/consumer.html#offset-management