问题描述
用例:
- Producer:在主题
db.inventory.customers
写入 Kafka
- ConsumerGroup1 (cg1):从
db.inventory.customers
读取并写入loader-b.inventory.customers
- ConsumerGroup2 (cg2):从
loader-b.inventory.customers
读取并写入 Github。
监控滞后并做一些工作
我们监控 cg1 滞后和 cg2 滞后。当两个消费者组的滞后都在 0 <= lag <= 100
范围内时,我们会执行一些任务。
问题
问题在于低吞吐量的加载器主题,cg2 消失了,所以我们不知道它的延迟并认为它是 -1。我们的条件从来没有得到满足,我们被卡住了。
现在如果我们考虑条件为
0 <= lag <= 100
用于 cg1
和 -1 <= lag <= 100
用于 cg2
然后,在没有创建 cg2 的第一次运行中,它会认为满足条件。但我们不希望那样。我们希望它做一些工作,然后就会出现延迟。
我该怎么办?
代码
func (t *kafkaWatch) consumerGroupLag(
id string,topic string,partition int32,broker *sarama.Broker,) (
int64,error,) {
defaultLag := int64(-1)
lastOffset,err := t.client.GetOffset(topic,partition,sarama.OffsetNewest)
if err != nil {
return defaultLag,fmt.Errorf("Error getting offset for topic partition: %s,err: %v",topic,err)
}
offsetFetchRequest := sarama.OffsetFetchRequest{
ConsumerGroup: id,Version: 1,}
offsetFetchRequest.AddPartition(topic,partition)
err = broker.Open(t.client.Config())
if err != nil && err != sarama.ErrAlreadyConnected {
return defaultLag,fmt.Errorf("Error opening broker connection again,err)
}
offsetFetchResponse,err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
return defaultLag,fmt.Errorf(
"Error fetching offset for offsetFetchRequest: %s %v,offsetFetchRequest,err)
}
if offsetFetchResponse == nil {
return defaultLag,fmt.Errorf(
"OffsetFetch request got no response for request: %+v",offsetFetchRequest)
}
for topicInResponse,partitions := range offsetFetchResponse.Blocks {
if topicInResponse != topic {
continue
}
for partitionInResponse,offsetFetchResponseBlock := range partitions {
if partition != partitionInResponse {
continue
}
// Kafka will return -1 if there is no offset associated
// with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset == -1 {
klog.V(4).Infof("%s not consumed by group: %v",id)
return defaultLag,nil
}
if offsetFetchResponseBlock.Err != sarama.ErrNoError {
return defaultLag,fmt.Errorf(
"Error since offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",offsetFetchResponseBlock.Err)
}
return lastOffset - offsetFetchResponseBlock.Offset,nil
}
}
klog.Warningf("%s for group is not active or present in Kafka",topic)
return defaultLag,nil
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)