Spring Boot REST服务Kafka主题无法使用指定数量的消息

问题描述

我有一个简单的Spring启动服务,该服务称为按需服务,使用来自主题的指定数量的消息。要消耗的消息数作为参数传递。每30分钟会致电一次服务。每个邮件大小约为1.6 kb。当我调用服务并传递参数3000时,我期望返回30万条消息,但是每次我总是得到1100或1200条消息。我只有一个分区有一个主题。这是一项按需服务,因此不使用while循环,并将轮询时间设置为30秒。但是响应会在10秒内返回,即使MAX_POLL_RECORDS_CONfig为3000、4000或5000,返回的记录数也为〜1200。请问,即使轮询时间为30秒,消息上是否有大小限制,我该如何实现更多吞吐量或接近极限。每次调用服务时,下面的命令都会执行。以下是该服务的调用方式: http://example.com/messages?limit=5000

Properties p = new Properties();
//limit is the value coming in as a query paramter and can be 3000,4000 or 5000
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONfig,limit);
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONfig,15000);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONfig,22 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONfig,50 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONfig,500);
p.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONfig,50 * 1024 * 1024);

consumer = consumerFactory.createConsumer("my-group-id",null,p);
consumer.assign(Collections.singleton(new TopicPartition("test-topic",0)));


ConsumerRecords<Object,Object> consumerRecords = consumer.poll(Duration.ofSeconds(30));

// processing the messages somehow I always get ~1200 messages 
.........................................
.........................................
.................................
consumer.commitAsync();

// return list of messages

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...