如何在Kafka MirrorMaker 2.0实例中看到延迟?

问题描述

我从原始的Kafka MirrorMaker迁移到MirrorMaker 2.0,以便将主题一个集群复制到另一个集群。我正在运行in the docs中所述的专用MirrorMaker群集。

假设我正在复制一个名为test-topic主题

Cluster A       Cluster B
----------      ----------
test-topic ---> A.test-topic

如何确定A.test-topic落后test-topic多远?

原始的MirrorMaker创建了消费者组,因此我提到了该消费者组的滞后。 MirrorMaker 2.0不会创建使用者组,因此我不能用它来确定滞后时间。

解决方法

在KIP-382中,有一个度量标准可以找到复制记录的数量。但不是直接计算滞后的方法。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-PublicInterfaces

,

我也有相同的用例。在MM2中,他们使用consumer.assign()而不是consumer.subscribe()来使用源集群

由于Assign不需要任何groupId,因此我们无法使用使用者组来获取滞后时间。

作为一种解决方法,我做了以下事情:

  1. 调度程序将每15分钟定期运行一次。
  2. 它将获取源集群主题和目标集群主题的日志末尾偏移量。
  3. 我们可以将两者进行比较,并根据延迟来配置警报。

为了找到对数结束偏移,我们可以为该主题创建一个使用者,并可以使用consumer.seekToEnd来获取该位置。另外,您需要确保在此流程中必须跳过内部主题。

,

引用Monitoring MirrorMaker2


MirrorMaker2的操作应与任何其他使用者组相同(因为它使用Connect框架)。该组将以前缀connect-开头,然后是您为其赋予的连接器"name"属性

您可以使用带有组ID的kafka-consumer-groups命令,也可以导出到诸如Burrow或Prometheus之类的外部工具

,

您不能直接滞后。MM2在目标群集{mm2-offets}中创建内部主题。source-cluster-name.internal.MM2在此处提交偏移量。消息键为{topic}-{partition},值为offset。一种方法是检查源主题的日志结束偏移量,并将其与内部主题中对该分区提交的偏移量进行比较。