Golang删除Kafka主题中的所有记录

问题描述

嗨,我正在用 Go 和 Kafka 编写一个服务,我需要实现一个删除所有端点,该端点将从特定主题删除所有记录。但是我找不到合适的方法来做到这一点。我正在为 Kafka 使用 Sarama library

到目前为止,我能找到的实现 delete all 的唯一两种方法删除主题,这似乎不是处理此问题的有效方法,第二种方法是使用 Sarama 中的 DeleteRecords 函数库,但是此函数删除偏移量小于相应分区的给定偏移量的记录。这意味着我必须先获得最新的偏移量。

基本上我正在寻找做这种事情的最佳方式。有人可以帮助我吗?最佳做法是什么?也许我错过了一些东西。我真的很感激一些例子。谢谢!

解决方法

如果您想修剪所有消息,另一种方法是将主题的保留时间减少到一个较小的值(例如 100 毫秒)。等待代理从主题中删除所有记录,然后将主题保留设置为其原始值。操作方法如下。

首先,将保留时间设置为 100 毫秒。

kafka-configs --zookeeper localhost:2181 \
--entity-type topics \
--entity-name my-topic \
--alter --add-config retention.ms=100

替代解决方案:

删除主题并重新创建 不像前两种方法那么优雅,但在某些情况下它可能是一个更简单的解决方案(例如,如果主题创建是脚本化的)。

kafka-topics --bootstrap-server localhost:9092 \
--topic my-topic \
--delete

然后再次创建:

kafka-topics --bootstrap-server localhost:9092 \
--topic my-topic \
--create \
--partitions <number_of_partitions> \
--replication-factor <replication_factor>