卡夫卡中的RecordTooLargeException

问题描述

以下是kafka发布代码,该代码提供了 RecordTooLargeException 例外。尝试了stackoverflow中给出的所有可能解决方案,这些解决方案提供了有关诸如max.request.size等不同属性的信息,但是没有任何效果。确切的堆栈跟踪为,原因是:org.springframework.kafka.KafkaException:发送失败;嵌套异常是org.apache.kafka.common.errors.RecordTooLargeException:序列化时,消息为1696090字节,大于1048576,这是max.request.size配置的值。有人可以帮助我解决此问题吗? 预先感谢。

    @Override
    public void run(String... args) throws Exception {

        JSONArray array = new JSONArray();

        for (int i = 0; i < 8000; i++) {
            JSONObject object = new JSONObject();
            object.put("no",1);
            object.put("name","Kella Vivek");
            object.put("salary",1000);
            object.put("address","2-143");
            object.put("city","gpm");
            object.put("pin",534316);
            object.put("dist","west");
            object.put("state","ap");
            object.put("username","mff");
            object.put("password","mff");
            array.add(object);
        }

        ObjectMapper mapper = new ObjectMapper();
        String string = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(array);

        template.send("consume",string);

    }

解决方法

这不是弹簧问题。您需要在 kafka 生产者中调整一些参数才能使其正常工作。

现在回答您的问题,我执行了以下操作以允许发送 100 mb 消息。

根据您的要求为 buffer.memory、message.max.bytes 和 max.request.size 创建属性并设置大小。

SQL> select * from test;

UPDATE_DAT    USER_ID
---------- ----------
25/05/2021       3456
25/05/2021       3456
25/05/2021      56478

SQL> with calendar (datum) as
  2    (select trunc(sysdate) - level + 1
  3     from dual
  4     connect by level <= 3
  5    )
  6  select c.datum as update_date,7    count(distinct t.user_id) count_user_id
  8  from calendar c left join test t on t.update_date = c.datum
  9  group by c.datum
 10  order by c.datum desc;

UPDATE_DAT COUNT_USER_ID
---------- -------------
25/05/2021             2
24/05/2021             0
23/05/2021             0

SQL>

使用上述属性创建生产者:

Properties producerProperties = new Properties();
producerProperties.put("buffer.memory",104857600);
producerProperties.put("message.max.bytes",104857600);
producerProperties.put("max.request.size",104857600);
producerProperties.put("bootstrap.servers",kafkaBootstrapServers);
producerProperties.put("acks","all");
producerProperties.put("retries",0);
producerProperties.put("batch.size",16384);
producerProperties.put("linger.ms",1);
producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

现在发送。

KafkaProducer<String,String> producer = new KafkaProducer<>(producerProperties);

您还需要确保目标服务器也支持大消息。我在服务器中配置了以下内容以支持大消息。

private static void sendKafkaMessage(String payload,KafkaProducer<String,String> producer,String topic)
{
    logger.info("Sending Kafka message: " + payload);
    producer.send(new ProducerRecord<>(topic,payload));
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...