问题描述
我正在尝试改编来自github.com/Shopify/sarama
的消费者组示例中的代码,并努力在session.MarkMessage()
方法中添加一个单元测试来测试ConsumeClaim
的功能({ {3}})。
这是我经过改编的带有consume()
函数的代码:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
var (
addrs = []string{"localhost:9092"}
topic = "my-topic"
)
func main() {
ctx,cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
consumer := &Consumer{ready: make(chan bool)}
close := consume(ctx,&wg,consumer)
defer close()
<-consumer.ready
log.Println("Sarama consumer up and running!")
sigterm := make(chan os.Signal,1)
signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
}
func consume(ctx context.Context,wg *sync.WaitGroup,consumer *Consumer) (close func()) {
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_2 // The version has to be at least V0_10_2_0 to support consumer groups
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumerGroup,err := sarama.NewConsumerGroup(addrs,"my-group",config)
if err != nil {
log.Fatalf("NewConsumerGroup: %v",err)
}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := consumerGroup.Consume(ctx,[]string{topic},consumer); err != nil {
log.Panicf("Consume: %v",err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
close = func() {
if err := consumerGroup.Close(); err != nil {
log.Panicf("Close: %v",err)
}
}
return
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
handle func([]byte) error
}
// Setup is run at the beginning of a new session,before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session,once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s,timestamp = %v,topic = %s",message.Value,message.Timestamp,message.Topic)
if consumer.handle != nil {
if err := consumer.handle(message.Value); err != nil {
return fmt.Errorf("handle message %s: %v",err)
}
}
session.MarkMessage(message,"")
}
return nil
}
以下是我为此编写的一些单元测试:
package main
import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)
func TestConsume(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer,err := sarama.NewSyncProducer(addrs,config)
require.NoError(t,err)
partition,offset,err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,Value: sarama.ByteEncoder([]byte("foobar")),})
require.NoError(t,err)
t.Logf("Sent message to partition %d with offset %d",partition,offset)
ctx,cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
consumer := &Consumer{ready: make(chan bool)}
close := consume(ctx,consumer)
<-consumer.ready
log.Println("Sarama consumer up and running!")
time.Sleep(1 * time.Second)
cancel()
wg.Wait()
close()
}
func TestConsumeTwice(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer,err)
data1,data2 := "foobar1","foobar2"
for _,data := range []string{data1,data2} {
partition,err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,Key: sarama.StringEncoder("foobar"),Value: sarama.StringEncoder(data),})
require.NoError(t,err)
t.Logf("Sent message to partition %d with offset %d",offset)
}
ctx,cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
messageReceived := make(chan []byte)
consumer := &Consumer{
ready: make(chan bool),handle: func(data []byte) error {
messageReceived <- data
fmt.Printf("Received message: %s\n",data)
return nil
},}
close := consume(ctx,consumer)
<-consumer.ready
log.Println("Sarama consumer up and running!")
for i := 0; i < 2; i++ {
data := <-messageReceived
switch i {
case 0:
assert.Equal(t,data1,string(data))
case 1:
assert.Equal(t,data2,string(data))
}
}
cancel()
wg.Wait()
close()
}
可以在https://github.com/Shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup/main.go#L160这样的Docker容器中运行Kafka和Zookeeper之后运行测试,如下所示:
docker run -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper
我正在努力解决的问题是:如果我注释掉行
session.MarkMessage(message,"")
测试仍然通过。根据{{3}},MarkMessage
将邮件标记为已使用,但是我将如何在单元测试中对其进行测试?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)