问题描述
这是我第一次尝试连接到托管在 AWS 服务器上的 Confluent Kafka 集群,但不使用任何 Amazon Managed Streaming 服务。
使用此代码(减去 Config.Net.SASL 设置),我能够毫无问题地连接到旧的自托管集群。
我不知道我做错了什么。
连接时,我收到此错误:
[sarama] 2021/01/13 08:21:54 Initializing new client
[sarama] 2021/01/13 08:21:54 client/metadata fetching metadata for all topics from broker <redacted URL>
[sarama] 2021/01/13 08:21:54 Failed to connect to broker <redacted URL>: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata got error from broker -1 while fetching metadata: dial tcp <redacted IP>: connect: connection refused
[sarama] 2021/01/13 08:21:54 client/metadata no available broker to send metadata request to
[sarama] 2021/01/13 08:21:54 client/brokers resurrecting 1 dead seed brokers
这是我的代码:
package main
/*
#define EXTSRV_IP
#define CENTRAL_IP
#define BILLING_IP
#define GO_ISP
#include <srvindx.h>
#include <rdb_util.c>
*/
import "C"
import (
"context"
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"reflect"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"
"unsafe"
"github.com/Shopify/sarama"
// "github.com/aws/aws-sdk-go/aws/credentials"
// v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
//
_ "github.com/go-sql-driver/mysql"
gl "gocode.prairiesys.com/lib/platform/genlog"
tk "gocode.prairiesys.com/lib/platform/token"
)
// StatusNew Const
const StatusNew = 1
// StatusConnected Const
const StatusConnected = 2
// StatusFinished Const
const StatusFinished = 7
// StatusError Const
const StatusError = 8
// StatusTerminated Const
const StatusTerminated = 9
// ConditionalData struct
type ConditionalData struct {
UserID sql.NullInt64
MailboxID sql.NullInt64
TemplateID sql.NullInt64
Account1 sql.NullString
Account2 sql.NullString
Block sql.NullString
UpdateDate time.Time
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// KafkaConnectionData struct
type KafkaConnectionData struct {
ID int
Group string
Assignor string
Brokers []string
Topics []string
Version sarama.KafkaVersion
Verbose bool
Running bool
Status int
StatusDescription string
ExternalService string
APIKey string
APISecret string
// CommChannel chan int
// Logged bool
CallID string
}
// Sarama configuration options
var (
wg1 sync.WaitGroup
wg2 sync.WaitGroup
done = false
appCallID string
logLevel string
qpickupErrors int
template string
account string
condData []ConditionalData
kafkaConnData []KafkaConnectionData
sleepCount int
sleepDuration time.Duration
nextRefresh time.Time
dir string
)
// Setup is run at the beginning of a new session,before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
// defer recovery("Setup")
gl.Log(gl.LVL1,gl.Name,"Consumer Setup()...")
gl.Log(gl.LVL1,"Setup: Claims [%v]",session.Claims())
for key,value := range session.Claims() {
gl.Log(gl.LVL1,"Setup: Claims topic[%s] partitions[%d]",key,value)
}
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session,once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
// defer recovery("Cleanup")
gl.Log(gl.LVL1,"Cleanup: Claims [%v]",value)
}
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {
// defer recovery("ConsumeClaim")
gl.Log(gl.LVL1,"ConsumeClaim: Topic [%s],Partition [%d]",claim.Topic(),claim.Partition())
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine,see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
gl.Log(gl.LVL1,"Message: value = %s,timestamp = %v,topic = %s",string(message.Value),message.Timestamp,message.Topic)
callID := C.GoString(C.strange_name())
buildBlockFromJSON(message,callID)
session.MarkMessage(message,fmt.Sprintf("CSGIM:%s",callID))
}
return nil
}
func buildBlockFromJSON(message *sarama.ConsumerMessage,callID string) {
defer recovery("buildBlockFromJSON")
// defer gl.Leave(gl.Enter())
var err error
var block string
var temp string
block,err = tk.BtokInit(tk.BLK_TYP_UNKNOWN,4096)
if err != nil {
gl.Log(gl.LVL1,"failed to initialize block,err[%s]",err.Error())
} else {
temp = fmt.Sprintf("%v",message.Timestamp)
block,err = tk.BtokAdd(block,"MessageTimestamp",tk.FLD_STRING,temp)
temp = fmt.Sprintf("%v",message.BlockTimestamp)
block,"MessageBlockTimestamp",temp)
block,"MessageTopic",message.Topic)
block,"MessagePartition",fmt.Sprintf("%d",message.Partition))
block,"MessageOffset",message.Offset))
var anyJSON map[string]interface{}
if err = json.Unmarshal(message.Value,&anyJSON); err != nil {
gl.Log(gl.LVL1,"FAILURE: json.Unmarshal error [%s]",err.Error())
} else if block,err = rangeMap(block,anyJSON,""); err != nil {
gl.Log(gl.LVL1,"FAILURE: rangeMap error [%s]",err.Error())
} else if err := sendToExternalService(block,callID); err != nil {
gl.Log(gl.LVL1,"FAILURE: sendToExternalService error [%s]",err.Error())
} else {
gl.Log(gl.LVL1,"MWS DEBUG: anyJSON [%v]",anyJSON)
}
}
}
func findGroupInList(group string,list []KafkaConnectionData) int {
defer recovery("findGroupInList")
// defer gl.Leave(gl.Enter())
response := -1
for idx,kafConData := range list {
if group == kafConData.Group {
response = idx
break
}
}
return response
}
func findStringInList(item string,list []string) (bool,error) {
defer recovery("findStringInList")
// defer gl.Leave(gl.Enter())
response := false
var err error
for _,str := range list {
if item == str {
response = true
break
}
}
if !response {
err = errors.New("Group not found in Conditional Data")
} else {
err = nil
}
return response,err
}
func init() {
defer recovery("init")
if !initGenLog() {
gl.Log(gl.All,101,"FAILURE: genlog initialization")
os.Exit(11)
} else {
gl.Log(gl.LVL1,"SUCCESS: genlog initialization")
}
if !initArgs() {
gl.Log(gl.All,"FAILURE: argument initialization")
os.Exit(12)
} else {
gl.Log(gl.All,"SUCCESS: argument initialization")
}
if !initConfigurations() {
gl.Log(gl.All,"FAILURE: configuration initialization")
os.Exit(13)
} else {
gl.Log(gl.All,"SUCCESS: configuration initialization")
}
if !initSrv() {
gl.Log(gl.All,"FAILURE: srv initialization")
os.Exit(14)
} else {
gl.Log(gl.LVL1,"SUCCESS: srv initialization")
}
gl.Log(gl.All,"SUCCESS: initialization complete")
}
func initArgs() bool {
defer recovery("initArgs")
// defer gl.Leave(gl.Enter())
gl.Log(gl.LVL1,"Initialize Arguments Begin")
flag.StringVar(&logLevel,"L","","Log level")
flag.StringVar(&template,"template","USER template to query on for connection data")
flag.StringVar(&account,"account","USER account to query on for connection data")
gl.Log(gl.LVL2,"Begin flag.Parse()")
flag.Parse()
gl.Log(gl.LVL2,"End flag.Parse()")
if flag.Parsed() {
gl.Log(gl.LVL1,"logLevel [%s]",logLevel)
gl.Log(gl.LVL1,"template [%s]",template)
gl.Log(gl.LVL1,"account [%s]",account)
} else {
gl.Log(gl.LVL1,"flags not parsed properly")
}
if len(template) == 0 {
gl.Log(gl.LVL1,"required argument [-template] missing")
panic("no user template defined,please set the -template flag")
}
if len(account) == 0 {
gl.Log(gl.LVL1,"required argument [-account] missing")
panic("no user account defined,please set the -account flag")
}
gl.Log(gl.LVL1,"Initialize Arguments Completed")
return true
}
func initConfigurations() bool {
defer recovery("initConfigurations")
// defer gl.Leave(gl.Enter())
// CODE REDACTED
return response
}
func initGenLog() bool {
if !gl.Init() {
gl.Log(gl.LVL1,"Error: Failed to initialize logging")
return false
}
appCallID = C.GoString(C.strange_name())
gl.ID(gl.File2|gl.File3,nil,nil)
gl.Conf(gl.File2,gl.NoChg,gl.NoChgPtr,gl.Date2Def,nil)
gl.ID(gl.File2|gl.File3,gl.SToCS(appCallID),nil)
gl.Log(gl.LVL3,"Genlog Init Completed")
return true
}
func initSrv() bool {
var vt C.int
defer recovery("initSrv")
// defer gl.Leave(gl.Enter())
// Get vterm
if vt = C.get_vterm_id(C.TRUE); vt < 0 {
gl.Log(gl.LVL1,"Error: Failed to get vterm")
return false
}
if C.srv_init(nil,0) < 0 {
gl.Log(gl.LVL1,"Error: Failed srv init")
return false
}
return true
}
func logKafkaConnectionData(kcd KafkaConnectionData,desc string) {
defer recovery("logKafkaConnectionData")
// defer gl.Leave(gl.Enter())
// CODE REDACTED
}
func main() {
defer recovery("main")
// defer gl.Leave(gl.Enter())
var t1 time.Time
var t2 time.Time
t1 = time.Now()
for done == false {
t2 = time.Now()
if time.Duration(t2.Sub(t1)) > (time.Duration(2) * time.Second) {
for idx := range kafkaConnData {
if kafkaConnData[idx].Running == false {
if kafkaConnData[idx].Status == StatusNew {
kafkaConnData[idx].Running = true
kafkaConnData[idx].Status = StatusConnected
wg1.Add(1)
go saramaConsumer(&kafkaConnData[idx])
}
}
}
t1 = time.Now()
}
}
wg1.Wait()
gl.Log(gl.LVL1,"Exiting application normally")
os.Exit(0)
}
func rangeMap(block string,myMap map[string]interface{},prefix string) (string,error) {
defer recovery("rangeMap")
// defer gl.Leave(gl.Enter())
// CODE REDACTED
return block,nil
}
func recovery(input string) {
// defer gl.Leave(gl.Enter())
if err := recover(); err != nil {
gl.Log(gl.All,"%s,Exception: %v\n%s\n",input,err,debug.Stack())
os.Exit(31)
}
}
func saramaConsumer(kcd *KafkaConnectionData) {
id := kcd.ID
defer recovery("saramaConsumer")
defer wg1.Done()
gl.Log(gl.LVL1,"ID[%d]. Starting a new Sarama consumer",id)
if kcd.Verbose {
sarama.Logger = log.New(os.Stdout,"[sarama] ",log.LstdFlags)
}
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = kcd.Version
switch kcd.Assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
gl.Log(gl.LVL1,"ID[%d]. Unrecognized consumer group partition assignor [%s]",id,kcd.Assignor)
log.Panicf("Unrecognized consumer group partition assignor: %s",kcd.Assignor)
}
// config.Net.TLS.Enable = false
// config.Net.SASL.Handshake = false
// config.Net.SASL.AuthIdentity = kcd.APIKey
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = kcd.APIKey
config.Net.SASL.Password = kcd.APISecret
config.ClientID = "CSGIM"
config.Consumer.Offsets.Initial = sarama.OffsetOldest
if err := config.Validate(); err != nil {
gl.Log(gl.LVL1,"Configuration validation failed,err [%s]",err)
} else {
gl.Log(gl.LVL1,"Configuration validation good,config [%v]",config)
}
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),}
ctx,cancel := context.WithCancel(context.Background())
client,err := sarama.NewConsumerGroup(kcd.Brokers,kcd.Group,config)
if err != nil {
gl.Log(gl.LVL1,"ID[%d]. Error creating consumer group client [%s]",err)
log.Panicf("Error creating consumer group client: %v",err)
}
wg2.Add(1)
go func(kcd *KafkaConnectionData) {
defer wg2.Done()
for {
// `Consume` should be called inside an infinite loop,when a
// server-side rebalance happens,the consumer session will need to be
// recreated to get the new claims
dir = kcd.ExternalService
if err := client.Consume(ctx,kcd.Topics,&consumer); err != nil {
gl.Log(gl.LVL1,"Error from consumer: %s",err)
log.Panicf("Error from consumer: %v",err)
}
// check if context was cancelled,signaling that the consumer should stop
if ctx.Err() != nil {
gl.Log(gl.LVL1,"ID[%d]. Error from context: %s",kcd.ID,ctx.Err())
return
}
consumer.ready = make(chan bool)
}
}(kcd)
<-consumer.ready // Await till the consumer has been set up
gl.Log(gl.LVL1,"ID[%d]. Sarama consumer up and running!...",id)
sigterm := make(chan os.Signal,1)
// signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM)
signal.Notify(sigterm,syscall.SIGHUP,syscall.SIGTERM,syscall.SIGQUIT,syscall.SIGTSTP)
select {
case <-ctx.Done():
done = true
gl.Log(gl.LVL1,"ID[%d]. terminating: context cancelled",id)
case <-sigterm:
done = true
gl.Log(gl.LVL1,"ID[%d]. terminating sarama: via signal",id)
}
cancel()
wg2.Wait()
if err = client.Close(); err != nil {
gl.Log(gl.LVL1,"ID[%d]. Error closing client [%s]",err)
log.Panicf("Error closing client: %s",err)
}
}
func sendToExternalService(block string,callID string) error {
defer recovery("sendToExternalService")
// CODE REDACTED
return err
}
// sToCS converts a Go string to a C string
func sToCS(s string) *C.char {
defer recovery("sToCS")
// make slice of bytes from string
b := make([]byte,len(s)+1)
copy(b[:],s)
return (*C.char)(unsafe.Pointer(&b[0]))
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)