问题描述
以下是kafka发布代码,该代码提供了 RecordTooLargeException 例外。尝试了stackoverflow中给出的所有可能解决方案,这些解决方案提供了有关诸如max.request.size等不同属性的信息,但是没有任何效果。确切的堆栈跟踪为,原因是:org.springframework.kafka.KafkaException:发送失败;嵌套异常是org.apache.kafka.common.errors.RecordTooLargeException:序列化时,消息为1696090字节,大于1048576,这是max.request.size配置的值。有人可以帮助我解决此问题吗? 预先感谢。
@Override
public void run(String... args) throws Exception {
JSONArray array = new JSONArray();
for (int i = 0; i < 8000; i++) {
JSONObject object = new JSONObject();
object.put("no",1);
object.put("name","Kella Vivek");
object.put("salary",1000);
object.put("address","2-143");
object.put("city","gpm");
object.put("pin",534316);
object.put("dist","west");
object.put("state","ap");
object.put("username","mff");
object.put("password","mff");
array.add(object);
}
ObjectMapper mapper = new ObjectMapper();
String string = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(array);
template.send("consume",string);
}
解决方法
这不是弹簧问题。您需要在 kafka 生产者中调整一些参数才能使其正常工作。
现在回答您的问题,我执行了以下操作以允许发送 100 mb 消息。
根据您的要求为 buffer.memory、message.max.bytes 和 max.request.size 创建属性并设置大小。
SQL> select * from test;
UPDATE_DAT USER_ID
---------- ----------
25/05/2021 3456
25/05/2021 3456
25/05/2021 56478
SQL> with calendar (datum) as
2 (select trunc(sysdate) - level + 1
3 from dual
4 connect by level <= 3
5 )
6 select c.datum as update_date,7 count(distinct t.user_id) count_user_id
8 from calendar c left join test t on t.update_date = c.datum
9 group by c.datum
10 order by c.datum desc;
UPDATE_DAT COUNT_USER_ID
---------- -------------
25/05/2021 2
24/05/2021 0
23/05/2021 0
SQL>
使用上述属性创建生产者:
Properties producerProperties = new Properties();
producerProperties.put("buffer.memory",104857600);
producerProperties.put("message.max.bytes",104857600);
producerProperties.put("max.request.size",104857600);
producerProperties.put("bootstrap.servers",kafkaBootstrapServers);
producerProperties.put("acks","all");
producerProperties.put("retries",0);
producerProperties.put("batch.size",16384);
producerProperties.put("linger.ms",1);
producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
现在发送。
KafkaProducer<String,String> producer = new KafkaProducer<>(producerProperties);
您还需要确保目标服务器也支持大消息。我在服务器中配置了以下内容以支持大消息。
private static void sendKafkaMessage(String payload,KafkaProducer<String,String> producer,String topic)
{
logger.info("Sending Kafka message: " + payload);
producer.send(new ProducerRecord<>(topic,payload));
}