Kafka 消费者群体滞后:初始和低贯穿案例问题 监控滞后并做一些工作问题

问题描述

用例:

  • Producer:在主题 db.inventory.customers
  • 写入 Kafka
  • ConsumerGroup1 (cg1):从 db.inventory.customers 读取并写入 loader-b.inventory.customers
  • ConsumerGroup2 (cg2):从 loader-b.inventory.customers 读取并写入 Github。

监控滞后并做一些工作

我们监控 cg1 滞后和 cg2 滞后。当两个消费者组的滞后都在 0 <= lag <= 100 范围内时,我们会执行一些任务

问题

问题在于低吞吐量的加载器主题,cg2 消失了,所以我们不知道它的延迟并认为它是 -1。我们的条件从来没有得到满足,我们被卡住了。

现在如果我们考虑条件为 0 <= lag <= 100 用于 cg1 和 -1 <= lag <= 100 用于 cg2

然后,在没有创建 cg2 的第一次运行中,它会认为满足条件。但我们不希望那样。我们希望它做一些工作,然后就会出现延迟。

我该怎么办?

代码

func (t *kafkaWatch) consumerGroupLag(
    id string,topic string,partition int32,broker *sarama.Broker,) (
    int64,error,) {
    defaultLag := int64(-1)

    lastOffset,err := t.client.GetOffset(topic,partition,sarama.OffsetNewest)
    if err != nil {
        return defaultLag,fmt.Errorf("Error getting offset for topic partition: %s,err: %v",topic,err)
    }

    offsetFetchRequest := sarama.OffsetFetchRequest{
        ConsumerGroup: id,Version:       1,}
    offsetFetchRequest.AddPartition(topic,partition)

    err = broker.Open(t.client.Config())
    if err != nil && err != sarama.ErrAlreadyConnected {
        return defaultLag,fmt.Errorf("Error opening broker connection again,err)
    }

    offsetFetchResponse,err := broker.FetchOffset(&offsetFetchRequest)
    if err != nil {
        return defaultLag,fmt.Errorf(
            "Error fetching offset for offsetFetchRequest: %s %v,offsetFetchRequest,err)
    }
    if offsetFetchResponse == nil {
        return defaultLag,fmt.Errorf(
            "OffsetFetch request got no response for request: %+v",offsetFetchRequest)
    }

    for topicInResponse,partitions := range offsetFetchResponse.Blocks {
        if topicInResponse != topic {
            continue
        }

        for partitionInResponse,offsetFetchResponseBlock := range partitions {
            if partition != partitionInResponse {
                continue
            }
            // Kafka will return -1 if there is no offset associated
            // with a topic-partition under that consumer group
            if offsetFetchResponseBlock.Offset == -1 {
                klog.V(4).Infof("%s not consumed by group: %v",id)
                return defaultLag,nil
            }
            if offsetFetchResponseBlock.Err != sarama.ErrNoError {
                return defaultLag,fmt.Errorf(
                    "Error since offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",offsetFetchResponseBlock.Err)
            }
            return lastOffset - offsetFetchResponseBlock.Offset,nil
        }
    }

    klog.Warningf("%s for group is not active or present in Kafka",topic)
    return defaultLag,nil
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...