Spring Boot REST服务Kafka主题commitSync失败

问题描述

我有一个简单的Spring启动服务,该服务称为按需服务,使用来自主题的指定数量的消息。要消耗的消息数作为参数传递。每30分钟会致电一次服务。每个邮件大小约为1.6 kb。我每次总是收到大约1100或1200条消息。只有一个分区有一个主题,REST服务是唯一的使用者。以下是该服务的调用方式:http://example.com/messages?limit=2000

private OutputResponse getNewMessages(String limit) throws Exception {
    
        System.out.println("***** START *****");
        
        final long start = System.nanoTime();
        
        int loopCntr = 0;   
        int counter = 0;
        OutputResponse outputResponse = new OutputResponse();       
        Output output = new Output();
        List<Output> rspListObject = new ArrayList<>();
        Consumer<Object,Object> consumer = null;
        String result = null;

        try {
            Properties p = new Properties();
            p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONfig,"180000");
            p.put(ConsumerConfig.MAX_POLL_RECORDS_CONfig,limit);
            
            consumer = consumerFactory.createConsumer("my-group-id",null,p);            
            consumer.assign(Collections.singleton(new TopicPartition("test-topic",0)));

            while (loopCntr < 2) {
                loopCntr++;
                ConsumerRecords<Object,Object> consumerRecords = consumer.poll(Duration.ofSeconds(15));
                
                for (ConsumerRecord<Object,Object> record : consumerRecords)
                {
                    counter++; 
                    try
                    {
                        //get json string
                        result = mapper.writeValueAsstring(record.value());
                        //to json
                        output = mapper.readValue(result,Output.class);                   
                        rspListObject.add(output);                       
                    } catch (Exception e) {
                        logger.error(e);
                        insertToDB(record.value(),record.offset());
                    }
                }
            }

            outputResponse.setobjects(rspListObject);
            
            final long end = System.nanoTime();
            System.out.println("Took: " + ((end - start) / 1000000) + "ms");
            System.out.println("Took: " + (end - start) / 1000000000 + " seconds");

            // commit the offset of records to broker
            if (counter > 0) {
                consumer.commitSync();
            }
        } finally {
            try {
                System.out.println(" >>>>> closing the  consumer");
                if (consumer != null)
                    consumer.close();
            }catch(Exception e){
                //log message
            }
        }

        return outputResponse;
    }

这是我在application.yml中拥有的

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: '*'
        max.poll.interval.ms: 300000
      group-id: my-group-id

ConsumerConfig值:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180000
max.poll.records = 500
Metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100

这是我在 commitSync 中遇到的错误。尝试使用poll()时消耗5条消息,并尝试设置p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONfig,“ 180000”);但同样的错误

无法完成提交,因为该组已经重新平衡并且 将分区分配给另一个成员。这意味着时间 后续对poll()的调用间的时间比配置的时间长 max.poll.interval.ms,通常表示轮询循环为 花太多时间处理邮件。您可以解决这个问题 通过增加max.poll.interval.ms或通过减小 在poll()中返回的批次具有max.poll.records。

解决方法

我相信该应用程序可以模拟您的用例,但不会表现出您所描述的行为(正如我预期的那样)。手动分配主题/分区时,您永远都不会看到重新平衡。

我建议您同时运行两者并比较DEBUG日志以找出问题所在。

@SpringBootApplication
public class So63713473Application {

    public static void main(String[] args) {
        SpringApplication.run(So63713473Application.class,args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63713473").partitions(1).replicas(1).build();
     }

    @Bean
    public ApplicationRunner runner(ConsumerFactory<String,String> factory,KafkaTemplate<String,String> template) {
        String msg = new String(new byte[1600]);
        return args -> {
            while (true) {
                System.out.println("Hit enter to run a consumer");
                System.in.read();
                int count = 0;
                try (Consumer<String,String> consumer = factory.createConsumer("so63713473","")) {
                    IntStream.range(0,1200).forEach(i -> template.send("so63713473",msg));
                    consumer.assign(Collections.singletonList(new TopicPartition("so63713473",0)));
                    while (count < 1200) {
                        ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(5));
                        count += records.count();
                        System.out.println("Count=" + count);
                    }
                    consumer.commitSync();
                    System.out.println("Success");
                }
            }
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.fetch-min-size=1920000
spring.kafka.consumer.fetch-max-wait=1000

spring.kafka.producer.properties.linger.ms=50

编辑

我可以通过在同一组中添加第二个(自动分配的)使用者来重现您的问题。

@KafkaListener(id = "so63713473",topics = "so63713473")
public void listen(String in) {
    System.out.println(in);
}
2020-09-08 16:40:15.828 ERROR 88813 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:776) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.demo.So63713473Application.main(So63713473Application.java:25) [classes/:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms,which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

您不能在同一组中混合使用手动分配和自动分配。