Kafka消费者偏移量自动重置等参数

问题描述

我有一个 KafkaConsumer,它需要订阅两个主题 topicAtopicB。但是我需要一些不同的参数。例如。如果我需要 auto.offset.reset topicAearilesttopicB 应该是 latest。我认为没有简单的方法可以做到这一点。一种选择是运行两个消费者,但在这种情况下,我需要两个轮询线程,因此应该处理多线程。有没有更简单的方法

解决方法

创建两个(或更多)线程是正确的。

消费者不是线程安全的,无论如何都应该与其他进程隔离和分离。

您可以使用更高级别的 Kafka 库(例如 Vert.x / Spring)来简化此操作。

,

如果您需要使用一个消费者,那么我认为您可以将 auto.reset.offset 设置为 latest,然后手动移动 topicA 的偏移量(如果需要)。为此,您可以在订阅和轮询循环之间:

  1. 获取分配给消费者的分区(方法 assignment
  2. 通过过滤前一点的 topicA 分区(方法 committed)来检查主题 topicA 的提交偏移量。如果结果为 null,则调用 seekToBeginning 方法。