在 Go 中使用 Uber-Zap 记录器将指定日志发送到 Kafka 接收器

问题描述

我正在尝试使用 zap logger 包创建一个包含文件、控制台和 Kafka 接收器的核心。我有一些非常具体的 INFO 级别日志,我想将它们发送到 Kafka 主题以供下游消费者处理。但是,在当前的实现中,我获得了 Kafka 主题中的所有 INFO 级别日志,即使是我不想要的日志。

有没有办法使用通用的 zap 记录器对象来防止相同级别的不需要的日志不进入任何特定的接收器?

下面是我用来创建单个记录器对象的函数。

func newZapLogger(config Configuration) (Logger,error) {
    var writer zapcore.WriteSyncer
    cores := []zapcore.Core{}

    if config.EnableFile {
        getLogLevel(config.FileLevel)
        if config.LogConfig == true {
            writer = zapcore.Lock(zapcore.AddSync(&lj.Logger{
                Filename: config.FileLocation,MaxSize:  config.LogMaxSize,Compress: config.LogCompression,MaxAge:   config.LogMaxAge,}))
        } else {
            writer = zapcore.Lock(zapcore.AddSync(&lj.Logger{
                Filename: config.FileLocation,}))
        }
        cores = append(cores,zapcore.NewCore(getEncoder(config.FileJSONFormat,config.IsColour),writer,atomLevel))
    }
    if config.EnableConsole {
        getLogLevel(config.ConsoleLevel)
        switch config.Stream {
        case 1:
            writer = zapcore.Lock(os.Stdout)
        case 2:
            writer = zapcore.Lock(os.Stderr)
        case 3:
            writer = zapcore.Lock(zapcore.AddSync(ioutil.Discard))
        default:
            writer = zapcore.Lock(os.Stdout)
        }
        cores = append(cores,zapcore.NewCore(getEncoder(config.ConsoleJSONFormat,atomLevel))
    }
if config.EnableKafka == true {
        highPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
            return lvl >= zapcore.WarnLevel
        })

        if len(brokerConn) > 0 {
            var (
                kl  LogKafka
                err error
            )
            kl.Topic = config.KafkaTopic
            config := sarama.NewConfig()
            config.Producer.RequiredAcks = sarama.WaitForAll
            config.Producer.Partitioner = sarama.NewRandomPartitioner
            config.Producer.Return.Successes = true
            config.Producer.Return.Errors = true

            kl.Producer,err = sarama.NewSyncProducer(brokerConn,config)
            if err != nil {
                return nil,fmt.Errorf("Failed to initialise kafka logger,connect to kafka failed: %v",err)
            } else {
                topicErrors := zapcore.AddSync(&kl)
                kafkaEncoder := zapcore.NewJSONEncoder(zap.NewDevelopmentEncoderConfig())
                cores = append(cores,zapcore.NewCore(kafkaEncoder,topicErrors,highPriority))
            }
        } else {
            return nil,no broker specified")
        }
    }

    appendedCore := zapcore.NewTee(cores...)
    logger := zap.New(appendedCore,zap.AddCaller(),zap.AddCallerSkip(1)).Sugar()
    defer logger.Sync()
    return logger,nil
}

我正在使用 Sarama 包来实现 kafka 生产者。 我还考虑过使用自定义日志记录级别。但是,zap 1.0v 不支持。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...