问题描述
我有一个 KafkaConsumer,它需要订阅两个主题 topicA
和 topicB
。但是我需要一些不同的参数。例如。如果我需要 auto.offset.reset
topicA
是 earilest
而 topicB
应该是 latest
。我认为没有简单的方法可以做到这一点。一种选择是运行两个消费者,但在这种情况下,我需要两个轮询线程,因此应该处理多线程。有没有更简单的方法?
解决方法
创建两个(或更多)线程是正确的。
消费者不是线程安全的,无论如何都应该与其他进程隔离和分离。
您可以使用更高级别的 Kafka 库(例如 Vert.x / Spring)来简化此操作。
,如果您需要使用一个消费者,那么我认为您可以将 auto.reset.offset
设置为 latest
,然后手动移动 topicA
的偏移量(如果需要)。为此,您可以在订阅和轮询循环之间:
- 获取分配给消费者的分区(方法 assignment)
- 通过过滤前一点的
topicA
分区(方法 committed)来检查主题topicA
的提交偏移量。如果结果为null
,则调用 seekToBeginning 方法。