将 sarama 消费者组连接到 AWS 上托管的融合云 kafka没有 AMS

问题描述

这是我第一次尝试连接到托管在 AWS 服务器上的 Confluent Kafka 集群,但不使用任何 Amazon Managed Streaming 服务。

使用此代码(减去 Config.Net.SASL 设置),我能够毫无问题地连接到旧的自托管集群。

我不知道我做错了什么。

连接时,我收到此错误:

[sarama] 2021/01/13 08:21:54 Initializing new client
[sarama] 2021/01/13 08:21:54 client/metadata fetching metadata for all topics from broker <redacted URL>
[sarama] 2021/01/13 08:21:54 Failed to connect to broker <redacted URL>: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata got error from broker -1 while fetching metadata: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata no available broker to send metadata request to
[sarama] 2021/01/13 08:21:54 client/brokers resurrecting 1 dead seed brokers

这是我的代码:

package main

/*
#define EXTSRV_IP
#define CENTRAL_IP
#define BILLING_IP
#define GO_ISP

#include <srvindx.h>
#include <rdb_util.c>
*/
import "C"

import (
    "context"
    "database/sql"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "reflect"
    "runtime/debug"
    "strconv"
    "sync"
    "syscall"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    // "github.com/aws/aws-sdk-go/aws/credentials"
    // v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
    //
    _ "github.com/go-sql-driver/mysql"

    gl "gocode.prairiesys.com/lib/platform/genlog"
    tk "gocode.prairiesys.com/lib/platform/token"
)

// StatusNew Const
const StatusNew = 1

// StatusConnected Const
const StatusConnected = 2

// StatusFinished Const
const StatusFinished = 7

// StatusError Const
const StatusError = 8

// StatusTerminated Const
const StatusTerminated = 9

// ConditionalData struct
type ConditionalData struct {
    UserID     sql.NullInt64
    MailboxID  sql.NullInt64
    TemplateID sql.NullInt64
    Account1   sql.NullString
    Account2   sql.NullString
    Block      sql.NullString
    UpdateDate time.Time
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}

// KafkaConnectionData struct
type KafkaConnectionData struct {
    ID                int
    Group             string
    Assignor          string
    Brokers           []string
    Topics            []string
    Version           sarama.KafkaVersion
    Verbose           bool
    Running           bool
    Status            int
    StatusDescription string
    ExternalService   string
    APIKey            string
    APISecret         string
    // CommChannel       chan int
    // Logged            bool
    CallID string
}

// Sarama configuration options
var (
    wg1           sync.WaitGroup
    wg2           sync.WaitGroup
    done          = false
    appCallID     string
    logLevel      string
    qpickupErrors int
    template      string
    account       string
    condData      []ConditionalData
    kafkaConnData []KafkaConnectionData
    sleepCount    int
    sleepDuration time.Duration
    nextRefresh   time.Time
    dir           string
)

// Setup is run at the beginning of a new session,before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
    // defer recovery("Setup")

    gl.Log(gl.LVL1,gl.Name,"Consumer Setup()...")

    gl.Log(gl.LVL1,"Setup: Claims [%v]",session.Claims())
    for key,value := range session.Claims() {
        gl.Log(gl.LVL1,"Setup: Claims topic[%s] partitions[%d]",key,value)
    }
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session,once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
    // defer recovery("Cleanup")

    gl.Log(gl.LVL1,"Cleanup: Claims [%v]",value)
    }

    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {
    // defer recovery("ConsumeClaim")

    gl.Log(gl.LVL1,"ConsumeClaim: Topic [%s],Partition [%d]",claim.Topic(),claim.Partition())
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine,see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {
        gl.Log(gl.LVL1,"Message: value = %s,timestamp = %v,topic = %s",string(message.Value),message.Timestamp,message.Topic)
        callID := C.GoString(C.strange_name())
        buildBlockFromJSON(message,callID)
        session.MarkMessage(message,fmt.Sprintf("CSGIM:%s",callID))
    }

    return nil
}

func buildBlockFromJSON(message *sarama.ConsumerMessage,callID string) {
    defer recovery("buildBlockFromJSON")
    // defer gl.Leave(gl.Enter())

    var err error
    var block string
    var temp string

    block,err = tk.BtokInit(tk.BLK_TYP_UNKNOWN,4096)
    if err != nil {
        gl.Log(gl.LVL1,"failed to initialize block,err[%s]",err.Error())

    } else {
        temp = fmt.Sprintf("%v",message.Timestamp)
        block,err = tk.BtokAdd(block,"MessageTimestamp",tk.FLD_STRING,temp)

        temp = fmt.Sprintf("%v",message.BlockTimestamp)
        block,"MessageBlockTimestamp",temp)

        block,"MessageTopic",message.Topic)
        block,"MessagePartition",fmt.Sprintf("%d",message.Partition))
        block,"MessageOffset",message.Offset))

        var anyJSON map[string]interface{}

        if err = json.Unmarshal(message.Value,&anyJSON); err != nil {
            gl.Log(gl.LVL1,"FAILURE: json.Unmarshal error [%s]",err.Error())
        } else if block,err = rangeMap(block,anyJSON,""); err != nil {
            gl.Log(gl.LVL1,"FAILURE: rangeMap error [%s]",err.Error())
        } else if err := sendToExternalService(block,callID); err != nil {
            gl.Log(gl.LVL1,"FAILURE: sendToExternalService error [%s]",err.Error())
        } else {
            gl.Log(gl.LVL1,"MWS DEBUG: anyJSON [%v]",anyJSON)
        }

    }
}

func findGroupInList(group string,list []KafkaConnectionData) int {
    defer recovery("findGroupInList")
    // defer gl.Leave(gl.Enter())

    response := -1

    for idx,kafConData := range list {
        if group == kafConData.Group {
            response = idx
            break
        }
    }

    return response
}

func findStringInList(item string,list []string) (bool,error) {
    defer recovery("findStringInList")
    // defer gl.Leave(gl.Enter())

    response := false
    var err error

    for _,str := range list {
        if item == str {
            response = true
            break
        }
    }

    if !response {
        err = errors.New("Group not found in Conditional Data")
    } else {
        err = nil
    }

    return response,err
}

func init() {
    defer recovery("init")

    if !initGenLog() {
        gl.Log(gl.All,101,"FAILURE: genlog initialization")
        os.Exit(11)
    } else {
        gl.Log(gl.LVL1,"SUCCESS: genlog initialization")
    }

    if !initArgs() {
        gl.Log(gl.All,"FAILURE: argument initialization")
        os.Exit(12)
    } else {
        gl.Log(gl.All,"SUCCESS: argument initialization")
    }

    if !initConfigurations() {
        gl.Log(gl.All,"FAILURE: configuration initialization")
        os.Exit(13)
    } else {
        gl.Log(gl.All,"SUCCESS: configuration initialization")
    }

    if !initSrv() {
        gl.Log(gl.All,"FAILURE: srv initialization")
        os.Exit(14)
    } else {
        gl.Log(gl.LVL1,"SUCCESS: srv initialization")
    }

    gl.Log(gl.All,"SUCCESS: initialization complete")

}

func initArgs() bool {
    defer recovery("initArgs")
    // defer gl.Leave(gl.Enter())

    gl.Log(gl.LVL1,"Initialize Arguments Begin")

    flag.StringVar(&logLevel,"L","","Log level")

    flag.StringVar(&template,"template","USER template to query on for connection data")
    flag.StringVar(&account,"account","USER account to query on for connection data")

    gl.Log(gl.LVL2,"Begin flag.Parse()")
    flag.Parse()
    gl.Log(gl.LVL2,"End   flag.Parse()")

    if flag.Parsed() {
        gl.Log(gl.LVL1,"logLevel [%s]",logLevel)

        gl.Log(gl.LVL1,"template  [%s]",template)
        gl.Log(gl.LVL1,"account   [%s]",account)

    } else {
        gl.Log(gl.LVL1,"flags not parsed properly")

    }

    if len(template) == 0 {
        gl.Log(gl.LVL1,"required argument [-template] missing")
        panic("no user template defined,please set the -template flag")
    }

    if len(account) == 0 {
        gl.Log(gl.LVL1,"required argument [-account] missing")
        panic("no user account defined,please set the -account flag")
    }

    gl.Log(gl.LVL1,"Initialize Arguments Completed")

    return true
}

func initConfigurations() bool {
    defer recovery("initConfigurations")
    // defer gl.Leave(gl.Enter())

    // CODE REDACTED

    return response
}

func initGenLog() bool {

    if !gl.Init() {
        gl.Log(gl.LVL1,"Error: Failed to initialize logging")
        return false
    }

    appCallID = C.GoString(C.strange_name())
    gl.ID(gl.File2|gl.File3,nil,nil)
    gl.Conf(gl.File2,gl.NoChg,gl.NoChgPtr,gl.Date2Def,nil)
    gl.ID(gl.File2|gl.File3,gl.SToCS(appCallID),nil)
    gl.Log(gl.LVL3,"Genlog Init Completed")

    return true
}

func initSrv() bool {
    var vt C.int

    defer recovery("initSrv")
    // defer gl.Leave(gl.Enter())

    // Get vterm
    if vt = C.get_vterm_id(C.TRUE); vt < 0 {
        gl.Log(gl.LVL1,"Error: Failed to get vterm")
        return false
    }

    if C.srv_init(nil,0) < 0 {
        gl.Log(gl.LVL1,"Error: Failed srv init")
        return false
    }

    return true
}

func logKafkaConnectionData(kcd KafkaConnectionData,desc string) {
    defer recovery("logKafkaConnectionData")
    // defer gl.Leave(gl.Enter())

    // CODE REDACTED
}

func main() {
    defer recovery("main")
    // defer gl.Leave(gl.Enter())

    var t1 time.Time
    var t2 time.Time
    t1 = time.Now()

    for done == false {
        t2 = time.Now()

        if time.Duration(t2.Sub(t1)) > (time.Duration(2) * time.Second) {
            for idx := range kafkaConnData {
                if kafkaConnData[idx].Running == false {
                    if kafkaConnData[idx].Status == StatusNew {
                        kafkaConnData[idx].Running = true
                        kafkaConnData[idx].Status = StatusConnected
                        wg1.Add(1)
                        go saramaConsumer(&kafkaConnData[idx])
                    }
                }
            }

            t1 = time.Now()
        }

    }

    wg1.Wait()

    gl.Log(gl.LVL1,"Exiting application normally")
    os.Exit(0)
}

func rangeMap(block string,myMap map[string]interface{},prefix string) (string,error) {
    defer recovery("rangeMap")
    // defer gl.Leave(gl.Enter())

    // CODE REDACTED
    
    return block,nil
}

func recovery(input string) {
    // defer gl.Leave(gl.Enter())

    if err := recover(); err != nil {
        gl.Log(gl.All,"%s,Exception: %v\n%s\n",input,err,debug.Stack())
        os.Exit(31)
    }
}

func saramaConsumer(kcd *KafkaConnectionData) {
    id := kcd.ID
    defer recovery("saramaConsumer")
    defer wg1.Done()

    gl.Log(gl.LVL1,"ID[%d]. Starting a new Sarama consumer",id)

    if kcd.Verbose {
        sarama.Logger = log.New(os.Stdout,"[sarama] ",log.LstdFlags)
    }

    /**
     * Construct a new Sarama configuration.
     * The Kafka cluster version has to be defined before the consumer/producer is initialized.
     */
    config := sarama.NewConfig()
    config.Version = kcd.Version

    switch kcd.Assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
    case "roundrobin":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    case "range":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    default:
        gl.Log(gl.LVL1,"ID[%d]. Unrecognized consumer group partition assignor [%s]",id,kcd.Assignor)
        log.Panicf("Unrecognized consumer group partition assignor: %s",kcd.Assignor)
    }

    // config.Net.TLS.Enable = false
    // config.Net.SASL.Handshake = false
    // config.Net.SASL.AuthIdentity = kcd.APIKey
    config.Net.SASL.Enable = true
    config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
    config.Net.SASL.User = kcd.APIKey
    config.Net.SASL.Password = kcd.APISecret
    config.ClientID = "CSGIM"
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    if err := config.Validate(); err != nil {
        gl.Log(gl.LVL1,"Configuration validation failed,err [%s]",err)
    } else {
        gl.Log(gl.LVL1,"Configuration validation good,config [%v]",config)
    }

    /**
     * Setup a new Sarama consumer group
     */
    consumer := Consumer{
        ready: make(chan bool),}

    ctx,cancel := context.WithCancel(context.Background())
    client,err := sarama.NewConsumerGroup(kcd.Brokers,kcd.Group,config)
    if err != nil {
        gl.Log(gl.LVL1,"ID[%d]. Error creating consumer group client [%s]",err)
        log.Panicf("Error creating consumer group client: %v",err)
    }

    wg2.Add(1)

    go func(kcd *KafkaConnectionData) {
        defer wg2.Done()

        for {
            // `Consume` should be called inside an infinite loop,when a
            // server-side rebalance happens,the consumer session will need to be
            // recreated to get the new claims
            dir = kcd.ExternalService
            if err := client.Consume(ctx,kcd.Topics,&consumer); err != nil {
                gl.Log(gl.LVL1,"Error from consumer: %s",err)
                log.Panicf("Error from consumer: %v",err)
            }
            // check if context was cancelled,signaling that the consumer should stop
            if ctx.Err() != nil {
                gl.Log(gl.LVL1,"ID[%d]. Error from context: %s",kcd.ID,ctx.Err())
                return
            }
            consumer.ready = make(chan bool)
        }
    }(kcd)

    <-consumer.ready // Await till the consumer has been set up
    gl.Log(gl.LVL1,"ID[%d]. Sarama consumer up and running!...",id)

    sigterm := make(chan os.Signal,1)
    // signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM)
    signal.Notify(sigterm,syscall.SIGHUP,syscall.SIGTERM,syscall.SIGQUIT,syscall.SIGTSTP)
    select {
    case <-ctx.Done():
        done = true
        gl.Log(gl.LVL1,"ID[%d]. terminating: context cancelled",id)
    case <-sigterm:
        done = true
        gl.Log(gl.LVL1,"ID[%d]. terminating sarama: via signal",id)
    }

    cancel()
    wg2.Wait()

    if err = client.Close(); err != nil {
        gl.Log(gl.LVL1,"ID[%d]. Error closing client [%s]",err)
        log.Panicf("Error closing client: %s",err)
    }
}

func sendToExternalService(block string,callID string) error {
    defer recovery("sendToExternalService")
    // CODE REDACTED
    return err
}

// sToCS converts a Go string to a C string
func sToCS(s string) *C.char {
    defer recovery("sToCS")
    // make slice of bytes from string
    b := make([]byte,len(s)+1)
    copy(b[:],s)

    return (*C.char)(unsafe.Pointer(&b[0]))
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...