问题描述
我有一个基本的生产者应用程序和一个消费者应用程序。如果我同时运行这两个程序,并且都开始专注于各自的主题,那么我的工作系统很好。我的想法是,如果我启动了生产者并发送了一条消息,那么我将能够启动消费者,并让其接受该消息。我错了。
除非两者都已启动并且正在运行,否则我会丢失消息(否则它们不会被消耗)。
我的消费类应用看起来像这样...
Uri uri = new Uri("http://localhost:9092");
KafkaOptions options = new KafkaOptions(uri);
brokerRouter brokerRouter = new brokerRouter(options);
Consumer consumer = new Consumer(new ConsumerOptions(receivetopic,brokerRouter));
List<OffsetResponse> offset = consumer.GetTopicOffsetAsync(receivetopic,100000).Result;
IEnumerable<OffsetPosition> t = from x in offset select new OffsetPosition(x.PartitionId,x.Offsets.Max());
consumer.SetoffsetPosition(t.ToArray());
IEnumerable<KafkaNet.Protocol.Message> msgs = consumer.Consume();
foreach (KafkaNet.Protocol.Message msg in msgs)
{
do some stuff here based on the message received
}
除非两行之间都有代码,否则每次启动应用程序时它都会从头开始。 什么是管理主题偏移的正确方法,以便断开连接后消费消息?
如果我跑步
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic chat-message-reply-XXX consumer-property fetch-size=40000000 --from-beginning
我可以看到消息,但是当我将应用程序连接到该主题时,consumer.Consume()不会拾取它尚未看到的消息。我已经尝试过,并且没有运行上面的bat文件,看看是否有任何区别。当我查看consumer.SetoffsetPosition(t.ToArray())调用(特别是t)时,它表明偏移量是该主题所有消息的计数。
请帮助
解决方法
将auto.offset.reset
中的ConsumerOptions
配置设置为earliest
。消费者组启动消费消息时,它将从最新的偏移量开始消费,因为auto.offset.reset
的默认值是最新的。
但是我现在查看了 kafka-net API,它没有AutoOffsetReset
属性,并且在消费者中的配置似乎还不够。它还缺少方法摘要的文档。
我建议您使用 Confluent .NET Kafka Nuget软件包,因为它由 Confluent 本身拥有。
此外,为什么要调用GetTopicOffsets
并在使用者中再次设置该偏移量。我认为在配置使用者时,您应该只使用Consume()
开始阅读消息。
尝试一下:
static void Main(string[] args)
{
var uri = new Uri("http://localhost:9092");
var kafkaOptions = new KafkaOptions(uri);
var brokerRouter = new BrokerRouter(kafkaOptions);
var consumerOptions = new ConsumerOptions(receivedTopic,brokerRouter);
var consumer = new Consumer(consumerOptions);
foreach (var msg in consumer.Consume())
{
var value = Encoding.UTF8.GetString(msg.Value);
// Process value here
}
}
此外,在KafkaOptions
和ConsumerOptions
中启用日志,它们将为您提供很多帮助:
var kafkaOptions = new KafkaOptions(uri)
{
Log = new ConsoleLog()
};
var consumerOptions = new ConsumerOptions(topic,brokerRouter)
{
Log = new ConsoleLog()
});
,
我转而使用Confluent的C#.NET软件包,现在可以使用了。