redigo 错误日志:写入:对等方重置连接

问题描述

几乎相同的时间(与redigo错误日志的时间点:write: connection reset by peer?),redis错误日志:
Client id=45183 addr=127.0.0.1:40420 fd=39 name= age=39706 idle=46 flags=N db=0 sub=8 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=16114 oll=528 omem=8545237 events=rw cmd=ping scheduled to be closed ASAP for overcoming of output buffer limits.

转到错误日志

write tcp 127.0.0.1:40806->127.0.0.1:6379: write: connection reset by peer

在此之前,Go 程序大约有 7 分钟没有收到订阅消息。我认为这是由于消息未被消费导致的缓存溢出。

Redis client-output-buffer-limit 是认配置。 linux fd和连接数都正常,找不到不可用的原因。

这是我的代码

server.go

func WaitFroMsg(ctx context.Context,pool *redis.Pool,onMessage func(channel string,data []byte) error,channel ...string) (err error) {
    conn := pool.Get()
    psc := redis.PubSubConn{Conn: conn}
    if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
        return err
    }
    done := make(chan error,1)
    go func() {
        for {
            switch n := psc.Receive().(type) {
            case error:
                done <- fmt.Errorf("redis pubsub receive err: %v",n)
                return
            case redis.Message:
                if err = onMessage(n.Channel,n.Data); err != nil {
                    done <- err
                    return
                }
            case redis.Subscription:
                if n.Count == 0 {
                    fmt.Println("all channels are unsubscribed",channel)
                    done <- nil
                    return
                }
            }
        }
    }()
    const healthCheck = time.Minute
    ticker := time.NewTicker(healthCheck) 
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            if err = psc.Ping(""); err != nil {
                fmt.Println("healthCheck ",err,channel)
                return err
            }
        case err := <-done:
            return err
        case <-ctx.Done(): 
            if err := psc.Unsubscribe(); err != nil {
                return fmt.Errorf("redis unsubscribe Failed: %v",err)
            }
            return nil
        }
    }
}

pool.go

func NewPool(addr string,db int) *redis.Pool {
    return &redis.Pool{
        MaxIdle:     3,IdleTimeout: 240 * time.Second,Dial: func() (redis.Conn,error) {
            c,err := redis.Dial("tcp",addr)
            if err != nil {
                return nil,err
            }
            if _,err = c.Do("SELECT",db); err != nil {
                c.Close()
                return nil,err
            }
            return c,nil
        },TestOnBorrow: func(c redis.Conn,t time.Time) error {
            if time.Since(t) < time.Minute {
                return nil
            }
            _,err := c.Do("PING")
            fmt.Println("PING error",err)
            return err
        },}
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)