Sarama Kafka库:如何对消费者组的session.MarkMessage进行单元测试?

问题描述

我正在尝试改编来自github.com/Shopify/sarama的消费者组示例中的代码,并努力在session.MarkMessage()方法中添加一个单元测试来测试ConsumeClaim的功能({ {3}})。

这是我经过改编的带有consume()函数的代码:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/Shopify/sarama"
)

var (
    addrs = []string{"localhost:9092"}
    topic = "my-topic"
)

func main() {
    ctx,cancel := context.WithCancel(context.Background())
    defer cancel()

    var wg sync.WaitGroup
    defer wg.Wait()

    consumer := &Consumer{ready: make(chan bool)}

    close := consume(ctx,&wg,consumer)
    defer close()

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    sigterm := make(chan os.Signal,1)
    signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
}

func consume(ctx context.Context,wg *sync.WaitGroup,consumer *Consumer) (close func()) {
    config := sarama.NewConfig()
    config.Version = sarama.V0_11_0_2 // The version has to be at least V0_10_2_0 to support consumer groups
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    consumerGroup,err := sarama.NewConsumerGroup(addrs,"my-group",config)
    if err != nil {
        log.Fatalf("NewConsumerGroup: %v",err)
    }

    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := consumerGroup.Consume(ctx,[]string{topic},consumer); err != nil {
                log.Panicf("Consume: %v",err)
            }
            if ctx.Err() != nil {
                return
            }
            consumer.ready = make(chan bool)
        }
    }()

    close = func() {
        if err := consumerGroup.Close(); err != nil {
            log.Panicf("Close: %v",err)
        }
    }
    return
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready  chan bool
    handle func([]byte) error
}

// Setup is run at the beginning of a new session,before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session,once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Printf("Message claimed: value = %s,timestamp = %v,topic = %s",message.Value,message.Timestamp,message.Topic)
        if consumer.handle != nil {
            if err := consumer.handle(message.Value); err != nil {
                return fmt.Errorf("handle message %s: %v",err)
            }
        }
        session.MarkMessage(message,"")
    }
    return nil
}

以下是我为此编写的一些单元测试:

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "testing"
    "time"

    "github.com/Shopify/sarama"
    "github.com/stretchr/testify/require"
    "gotest.tools/assert"
)

func TestConsume(t *testing.T) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer,err := sarama.NewSyncProducer(addrs,config)
    require.NoError(t,err)

    partition,offset,err := producer.SendMessage(&sarama.ProducerMessage{
        Topic: topic,Value: sarama.ByteEncoder([]byte("foobar")),})
    require.NoError(t,err)
    t.Logf("Sent message to partition %d with offset %d",partition,offset)

    ctx,cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    consumer := &Consumer{ready: make(chan bool)}

    close := consume(ctx,consumer)

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    time.Sleep(1 * time.Second)

    cancel()
    wg.Wait()
    close()
}

func TestConsumeTwice(t *testing.T) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer,err)

    data1,data2 := "foobar1","foobar2"

    for _,data := range []string{data1,data2} {
        partition,err := producer.SendMessage(&sarama.ProducerMessage{
            Topic: topic,Key:   sarama.StringEncoder("foobar"),Value: sarama.StringEncoder(data),})
        require.NoError(t,err)
        t.Logf("Sent message to partition %d with offset %d",offset)
    }

    ctx,cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    messageReceived := make(chan []byte)
    consumer := &Consumer{
        ready: make(chan bool),handle: func(data []byte) error {
            messageReceived <- data
            fmt.Printf("Received message: %s\n",data)
            return nil
        },}

    close := consume(ctx,consumer)

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    for i := 0; i < 2; i++ {
        data := <-messageReceived
        switch i {
        case 0:
            assert.Equal(t,data1,string(data))
        case 1:
            assert.Equal(t,data2,string(data))
        }
    }

    cancel()
    wg.Wait()
    close()
}

可以在https://github.com/Shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup/main.go#L160这样的Docker容器中运行Kafka和Zookeeper之后运行测试,如下所示:

docker run -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1  -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper

我正在努力解决的问题是:如果我注释掉行

        session.MarkMessage(message,"")

测试仍然通过。根据{{​​3}},MarkMessage将邮件标记为已使用,但是我将如何在单元测试中对其进行测试?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...