spring-kafka-当来自不同分区的多个失败记录时,seekToCurrentErrorHandler表现为随机行为,而使用者离开经纪人

问题描述

我正在研究使用带有seekToCurrentErrorHandler的自定义恢复器恢复错误记录。 我们在项目中使用的是spring-kafka版本2.2.6-RELEASE(带有spring-boot)。看到一个消费者(并发1)具有来自不同分区的多个失败记录的情况(并面对问题),它会陷入无限循环(https://github.com/spring-projects/spring-kafka/issues/1237) 作为其在更高版本中修复的问题,已切换到2.3.3-RELEASE,最终遇到了下面描述的另一个问题(也尝试了2.5.4-RELEASE版本,在同一问题中结束)。

因此,使用下面的spring-kafka 2.3.3-RELEASE或2.5.4-RELEASE是我们面临的问题。

当单个kafka用户使用某个主题要消费的6条记录时(3个分区,每个分区2条记录) 以及所有这些都导致失败时(@KafkaListener注释的方法抛出runtimeException) 侦听器(即@KafkaListener带注释的方法)被调用2或3条记录(随机),按预期重试(5条报废+ 1条原始故障),而恢复器被这些记录调用 对于第3或第4条记录,尝试进行大约3次重试,并在大约5分钟(默认为max.poll.interval.ms)后,消费者与代理断开连接(不处理其余3条记录)。 同样,当处理3条记录时,仅提交2条记录(即,下次启动应用时,将使用已处理的第三条记录再次调用侦听器) 以下是配置。 也可以与JUNIT复制。 如果我在这里缺少什么,请告诉我!! 在来自不同分区的多个失败记录的情况下,我们是否不能仅将seekToCurrentErrorHandler与恢复程序一起使用?还是我们只能对RecoveryCallback使用状态重试?(也尝试过并遇到相同的问题)?

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory) throws IOException{
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(2000));
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(processingFailedErrorRecoverer(),new FixedBackOff(0L,5L));
        seekToCurrentErrorHandler.setCommitRecovered(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }

带有retryTemplate和recoverycallback

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        factory.setRetryTemplate(new RetryTemplate()); // 3 retries by default
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback(context -> {
            processingFailedErrorRecoverer().accept((ConsumerRecord<?,?>) context.getAttribute("record"),(Exception) context.getLastThrowable());
            return null;
        });

        factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(2000));
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(new FixedBackOff(0L,3L));
        seekToCurrentErrorHandler.setCommitRecovered(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }

当使用者断开连接时记录语句。

2.3.3-RELEASE-> 2020-08-10 10:51:23.881 23 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator-[Consumer clientId = consumer-1 ,groupId = xyz-consumer-group]成员Consumer-1-1a0978c4-9ae6-45b9-8d9d-f3ddee081df9向协调员发送LeaveGroup请求(id:2147482644机架:null) 2020-08-10 10:51:23.882 23 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer-[Consumer clientId = consumer-1,groupId = xyz-consumer-group]取消订阅所有主题或模式并分配分区

2.5.4-RELEASE-> 2020-08-10 14:34:20.902 36 [kafka-coordinator-heartbeat-thread | xyzconsumer-group]信息org.apache.kafka.clients.consumer.internals.AbstractCoordinator-[Consumer clientId = consumer-xyz-consumer-group-1,groupId = xyz-consumer-group]成员Consumer-xyz-consumer-group- 1-8324f4e3-4ec3-4b34-b6af-c4ff01a0aa01由于消费者轮询超时已到期而向协调员(请发送ID:2147482644机架:null)发送LeaveGroup请求。这意味着后续调用poll()之间的时间比配置的max.poll.interval.ms更长,这通常意味着轮询循环在处理消息上花费了太多时间。您可以通过增加max.poll.interval.ms或通过使用max.poll.records减小poll()中返回的批处理的最大大小来解决此问题。

src / main / java

package com.orgname.gtb.cmng.kafka;
/**
 * @param <V> Original message type.
 * @param <T> Message type to be published.
 */
@Slf4j
public abstract class AbstractErrorRecoverer<V,T> implements BiConsumer<ConsumerRecord<?,?>,Exception> {
    private static final String LOGGER_NAME="ERRORHANDLER";
    private static final Logger LOGGER = LoggerFactory.getLogger(LOGGER_NAME);       
    private final KafkaTemplate<String,T> kafkaTemplate; 
    private final KafkaTemplate<String,byte[]> deserializationErrorRecoveryKafkaTemplate;

    protected AbstractErrorRecoverer(KafkaTemplate<String,T> kafkaTemplate,KafkaTemplate<String,byte[]> deserializationErrorRecoveryKafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
        this.deserializationErrorRecoveryKafkaTemplate=deserializationErrorRecoveryKafkaTemplate;
        log.info("Recoverer initialized with alertDispatcher and kafkaTemplate.");
    }
    

    @SuppressWarnings("unchecked")
    @Override
    public void accept(ConsumerRecord<?,?> consumerRecord,Exception e) {
        V original = (V) consumerRecord.value();
        // TODO Do other common things,like alerting etc. 
        List<Header> headers = this.enhanceHeaders(consumerRecord,e);
        
        DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(consumerRecord,ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER,new LogAccessor(LOGGER_NAME));
        
        if(deserEx!=null){  
            ProducerRecord<String,byte[]> deserilizationErrorRecord = new ProducerRecord<>(getDeserializationErrorRecoveryTopic(),consumerRecord.partition(),(String) consumerRecord.key(),deserEx.getData(),headers);
            if (deserializationErrorRecoveryKafkaTemplate.isTransactional() && !deserializationErrorRecoveryKafkaTemplate.inTransaction()) {
                deserializationErrorRecoveryKafkaTemplate.executeInTransaction(kafkaOperations -> {
                    this.publishDeserializationError(deserilizationErrorRecord,kafkaOperations);
                    return null;
                });
            } else {
                publishDeserializationError(deserilizationErrorRecord,deserializationErrorRecoveryKafkaTemplate);
            }
        }
        else {
            T objectToPublish=messageToPublish(consumerRecord,e.getCause());
            ProducerRecord<String,T> pr = new ProducerRecord<>(getErrorTopic(),objectToPublish,headers);

            if (kafkaTemplate.isTransactional() && !kafkaTemplate.inTransaction()) {
                kafkaTemplate.executeInTransaction(kafkaOperations -> {
                    this.publish(pr,kafkaOperations);
                    return null;
                });
            } else {
                publish(pr,kafkaTemplate);
            }
        }
    }

    private void publish(ProducerRecord<String,T> record,KafkaOperations<String,T> ops) {
        try {   
            ops.send(record).addCallback(stringTSendResult -> {
                log.debug("Successfully published message to dead letter topic");
            },ex -> {
                log.error("error publishing to ERROR-Topic",ex);
            });
        } catch (Exception e) {
            log.error("Error publishing to error-topic.",e);
        }
    }
    
    private void publishDeserializationError(ProducerRecord<String,byte[]> record,byte[]> ops) {
        try {
            System.out.println("before pub to recovery topic");
            ops.send(record).addCallback(stringTSendResult -> {
                log.debug("Successfully published message to deserialization recovery topic.");
            },ex -> {
                log.error("error publishing to deserialization recovery topic.",ex);
            });
        } catch (Exception e) {
            log.error("Error publishing to deserialization recovery topic.",e);
        }
    }
    
    
    private List<Header> enhanceHeaders(ConsumerRecord<?,?> record,Exception exception) {
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC,record.topic().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION,ByteBuffer.allocate(4).putInt(record.partition()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET,ByteBuffer.allocate(8).putLong(record.offset()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP,ByteBuffer.allocate(8).putLong(record.timestamp()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE,record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN,exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE,exception.getMessage().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE,this.getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
        Header valDeserExceptionheader  =record.headers().lastHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
        if (valDeserExceptionheader != null) {
            headers.add(valDeserExceptionheader);
        }
        return headers;
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter,true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }

    /**
     * @return  The error topic to which the notification should be sent.
     */
    protected abstract String getErrorTopic();
    
    /**
     * 
     * @return The error topic to which deserialization error should be sent.
     */
    protected abstract String getDeserializationErrorRecoveryTopic();
    
    /**
     * This method receives the original consumer record and throwable that was thrown by the listener 
     * Override this method to publish a different message (e.g. an enriched message to errorTopic).
     * By default the original message is returned which is published.
     * @param originalConsumerRecord The original consumer record. Same as that received by listener
     * @param t Throwable thrown by listner.
     * @return The expected message to be published.
     */
    protected T messageToPublish(ConsumerRecord<?,?> originalConsumerRecord,Throwable t){
        return (T)originalConsumerRecord.value();
    }

}

src / test / java

package com.orgname.gtb.cmng.config;
@EnableKafka
@Configuration
@Slf4j
public class IntegrationTestConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // start of config for kafkatemplate that publishes a message 
    @Bean
    public Map<String,Object> producerProps() {
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        
        return props;
    }

    @Bean
    public ProducerFactory<String,String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerProps(),new StringSerializer(),new StringSerializer());
    }


    @Bean 
    public KafkaTemplate<String,String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    //end of config for kafkatemplate that publishes a message
    
    // start of config for kafkatemplate that recovers deserialiazation error
    @Bean
    public Map<String,Object> deserializationErrorProducerProps() {
        Map<String,ByteArraySerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String,byte[]> deserializationErrorProducerFactory() {
        return new DefaultKafkaProducerFactory(deserializationErrorProducerProps());
    }


    @Bean 
    public KafkaTemplate<String,byte[]> deserializationErrorRecoveryKafkaTemplate() {
        return new KafkaTemplate<>(deserializationErrorProducerFactory());
    }
   // end of config for kafkatemplate that recovers deserialiazation error
    
    // config for kafkatemplate that publishes to deadlettertopic.
    @Bean
    public KafkaTemplate<String,String> deadLetterKafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    // consumers config
    @Bean
    public Map<String,Object> getConsumerProps() {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,15000);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group-id");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        return props;
    }

    @Bean
    DefaultKafkaConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(
                getConsumerProps(),new StringDeserializer(),new StringDeserializer()
        );
    }

    // config for the error handler and its publisher to the dead letter topic

    @Bean   // the error recoverer
    public StringErrorRecovererImplementation processingFailedErrorRecoverer() {
        return new StringErrorRecovererImplementation(deadLetterKafkaTemplate(),deserializationErrorRecoveryKafkaTemplate());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String,5L));
        seekToCurrentErrorHandler.setCommitRecovered(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }
    
    // config for the listener on the happy topic
    @Bean
    @Primary
    public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry =
                new KafkaListenerEndpointRegistry();
        return kafkaListenerEndpointRegistry;
    }

    // the listener
    @Bean
    public IntegrationTestMessageListener simpleStringMessageListener() {
        return new IntegrationTestMessageListener(kafkaListenerEndpointRegistry());
    }

src / test / java

 package com.orgname.gtb.cmng.kafka.integrationtest;
    @RunWith(SpringRunner.class)
    @TestPropertySource(properties = {"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
    @EmbeddedKafka(partitions = 3,topics = {"${topics.happy}","${topics.deadLetter}"})
    @SpringBootTest(classes = {IntegrationTestConfig.class})
    public class ErrorRecovererIntegrationTest {
    
         private static final String BAD_MESSAGE = "Poison message";
    
        @Value("${topics.happy}")
        private String happyTopic;
    
        @Value("${topics.deadLetter}")
        private String deadLetterTopic;
    
        @Autowired
        private EmbeddedKafkaBroker embeddedKafka;
    
        @Autowired
        private ConsumerFactory<String,String> consumerFactory; // will use the deadLetterConsumer factory in the TestKafkaConfig
    
    
        @Autowired
        protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @Autowired
        private IntegrationTestMessageListener listener;
    
        private Consumer<String,String> deadLetterConsumer;
    
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        @Before
        public void setUp() {
            embeddedKafka.brokerProperty("controlled.shutdown.enable",true);
    
            for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
                log.debug("Listener container props:\n{}",messageListenerContainer.getContainerProperties().toString());
                ContainerTestUtils.waitForAssignment(messageListenerContainer,embeddedKafka.getPartitionsPerTopic());
            }
            deadLetterConsumer = consumerFactory.createConsumer();
            deadLetterConsumer.subscribe(Collections.singleton(deadLetterTopic));
            deadLetterConsumer.poll(Duration.ofMillis(0));
        }
    
        @After
        public void tearDown() {
            listener.clear();
        }
    
        @Test
        @DirtiesContext
        public void given_bad_message_should_publish_to_dead_letter_topic() throws Exception {
            IntStream.range(0,6).forEach(i -> kafkaTemplate.send(happyTopic,i % 3,i+"",BAD_MESSAGE));
            Thread.sleep(5000);
            ConsumerRecords<String,String> consumerRecords= KafkaTestUtils.getRecords(deadLetterConsumer);
            assertEquals(6,consumerRecords.count());
        }

src / test / java

package com.db.orgname.cmng.kafka.integrationtest;
/**
 * This listener will listen for "poison messages" and throw a runtime exception so the exception handling can be done.
 */
@Service
@Slf4j
public class IntegrationTestMessageListener {

    @Getter
    private final KafkaListenerEndpointRegistry registry;

    @Getter
    private Map<String,String> messages = new HashMap<>();

    public void clear() {
        messages.clear();
    }

    @Autowired
    public IntegrationTestMessageListener(KafkaListenerEndpointRegistry registry) {
        log.debug("Created simple listener");
        this.registry = registry;
    }

    @KafkaListener(topics = "${topics.happy}")
    public void listen(@Payload String value,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        log.info("Simple listener received message --  key: {},value: {}",key,value);
        if (value.toLowerCase().startsWith("poison")) {
            throw new RuntimeException("failed");
        } else {
            messages.put(key,value);
        }

    }

src / test / java

package com.orgname.gtb.cmng.kafka.integrationtest;
@Getter
@Service
public class StringErrorRecovererImplementation extends AbstractErrorRecoverer<String,String> {

    public StringErrorRecovererImplementation(KafkaTemplate<String,String> kafkaTemplate,byte[]> deserializationErrorRecoveryKafkaTemplate) {
        super(kafkaTemplate,deserializationErrorRecoveryKafkaTemplate);
    }

    @Override
    protected String getErrorTopic() {
        return "T-ERROR-TOPIC";
    }

    @Override
    protected String messageToPublish(ConsumerRecord<?,?> orginal,Throwable t) {
        String originalString=(String)orginal.value();
        return originalString + t.getMessage();
    }

    @Override
    protected String getDeserializationErrorRecoveryTopic() {
        return "T-DESERIALIZATION-ERROR-TOPIC";
    }

src / test / resources application.yml

topics:
  happy: T-HAPPY-TOPIC
  deadLetter: T-ERROR-TOPIC
  deserializationError: T-DESERIALIZATION-ERROR-TOPIC
spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
    producer:
      acks: all

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.orgname.gtb.cmng</groupId>
    <artifactId>nextgen-commons-error-handler</artifactId>
    <version>0.1.1-SNAPSHOT</version>
    <name>nextgen-commons-error-handler</name>
    <description>nextgen commons error handler</description> <!--fixme: Add proper description-->

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <sonar.language>java</sonar.language>
        <lombok.version>1.18.8</lombok.version>

        <!--Test Dependencies-->
        <confluent.version>5.4.0</confluent.version>
        <mockito-core.version>2.9.0</mockito-core.version>
        <mockito-all.version>1.9.5</mockito-all.version>
        <junit.version>4.13</junit.version>
        <assertj-core.version>3.13.2</assertj-core.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
            <version>${lombok.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.4.RELEASE</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.1</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>             
        </dependency>
        <!--Test Dependencies-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.5.4.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
            <exclusions>
              <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
              </exclusion>
              <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-annotations</artifactId>
                </exclusion>
            </exclusions> 
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>common-config</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>common-utils</artifactId>
            <version>${confluent.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- Test dependencies -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.2.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>${assertj-core.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>0.8.3</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>report</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                        <!--TODO-changeme: Change the exclusions based on individual project requirements-->
                        <configuration>
                            <excludes>
                                <exclude>**/entities/*.class</exclude>
                                <exclude>**/avro/*.class</exclude>
                            </excludes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>sonar-maven-plugin</artifactId>
                <version>3.7.0.1746</version>
            </plugin>
        </plugins>
    </build>

</project>

解决方法

请提供一个表现出这种行为的项目;我无法复制它;此应用程序一切正常:

@SpringBootApplication
public class So63349172Application {

    public static void main(String[] args) {
        SpringApplication.run(So63349172Application.class,args);
    }

    @KafkaListener(id = "so63349172",topics = "so63349172")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    ErrorHandler eh() {
        return new SeekToCurrentErrorHandler(
                (rec,ex) -> System.out.println("Recovered " + ListenerUtils.recordToString(rec,true)),new FixedBackOff(0,2)) {

            @Override
            public void handle(Exception thrownException,List<ConsumerRecord<?,?>> records,Consumer<?,?> consumer,MessageListenerContainer container) {

                System.out.println("Failed " + ListenerUtils.recordToString(records.get(0),true));
                super.handle(thrownException,records,consumer,container);
            }

        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63349172").partitions(3).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String,String> template) {
        return args -> {
            IntStream.range(0,6).forEach(i -> template.send("so63349172",i % 3,null,"foo"));
        };
    }

}

我在您的配置中看不到会导致重新平衡的任何内容。

这是我测试的结果(3个代理群集)...

$ egrep '^(Failed|Recovered)' ../tmp/gg
Failed so63349172-1@0
Failed so63349172-2@0
Failed so63349172-0@0
Failed so63349172-1@0
Failed so63349172-2@0
Failed so63349172-0@0
Failed so63349172-1@0
Recovered so63349172-1@0
Failed so63349172-2@0
Recovered so63349172-2@0
Failed so63349172-0@0
Recovered so63349172-0@0
Failed so63349172-1@1
Failed so63349172-2@1
Failed so63349172-0@1
Failed so63349172-1@1
Failed so63349172-2@1
Failed so63349172-0@1
Failed so63349172-1@1
Recovered so63349172-1@1
Failed so63349172-2@1
Recovered so63349172-2@1
Failed so63349172-0@1
Recovered so63349172-0@1

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...