问题描述
我有一个Kafka ConsumerGroup(Golang / Segmentio),其中一个读者是单元测试用例的一部分
r := kafka.NewReader(kafka.ReaderConfig{
brokers: []string{"localhost:9092"},Topic: "test",CommitInterval: time.Second,// flushes commits to Kafka every second
MaxWait: 10 * time.Millisecond,GroupID: "test_logs",StartOffset: kafka.LastOffset,})
阅读器能够读取消息,但是从主题读取所有消息后,它正在等待新消息,并且测试用例在30秒后超时。
如果我设置了context.WithTimeout(ctx,20*time.Second)
,则阅读器将无法阅读该主题中的任何消息。
所以我该如何从主题中读取消息并继续进行操作,而不是等待新消息。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)