问题描述
我正在尝试从上次偏移读取 Kafka 主题,但无法正确读取:
group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=最早的
isolation.level=read_committed
然后是代码:
consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());
即使我在主题中有一些项目,我也可以看到如何从 0 打印偏移量。
在日志文件中我可以看到这个(缩短):
[Consumer clientId=my_client,groupId=my_group] 分配完成 对于第 1 代的组: {my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=分配(partitions=[mytopic-0])}
[Consumer clientId=my_client,groupId=my_group] 加入成功 第 1 代 [Consumer clientId=my_client,groupId=my_group] 通知转让人有关新的 分配(partitions=[mytopic-0])
[Consumer clientId=my_client,groupId=my_group] 添加新分配的 分区:mytopic-0
[Consumer clientId=my_client,groupId=my_group] 发现没有提交 分区 mytopic-0 的偏移量
[Consumer clientId=my_client,> groupId=my_group] 重置偏移量 将 mytopic-0 分区到偏移量 0。
知道我做错了什么吗?我用 seekToBeginning() + poll() + commitSync() + seekToEnd() 尝试了一些魔法,它以某种方式起作用,但我认为默认情况下这应该起作用。
解决方法
auto.offset.reset=latest会解决你的问题
https://docs.confluent.io/platform/current/clients/consumer.html#offset-management