如何在kafka中正确实现指数重试?

问题描述

我是 Kafka 的初学者。
我一直在尝试对 Kafka 消费者中的失败记录实施指数重试。重试 4 次后,消费者需要关闭。重试应在 1 分钟、5 分钟、15 分钟和 30 分钟后再次进行。经过所有这些尝试,如果重试不成功,那么我需要关闭使用者。 我已经做了以下来实现它。但 5 分钟后(max.poll.interval),消费者重新平衡。如何完成所有重试尝试(失败时尝试 5 次),然后关闭消费者?

        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setinitialInterval(60000);
        backOffPolicy.setMultiplier(5);
        backOffPolicy.setMaxInterval(900000);
        
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy);
        return retryTemplate;
    }

    private RetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>,Boolean> exceptionMap = new HashMap<>();
        exceptionMap.put(IllegalArgumentException.class,false);
        exceptionMap.put(RecoverableDataAccessException.class,true);
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(4,exceptionMap,true);
        return simpleRetryPolicy;
    }

解决方法

max.poll.interval 是 kafka 集群在将其视为“死”之前等待来自消费者的轮询调用的时间上限。一旦消费者组的其中一个消费者被认为已死亡,kafka 集群将触发对该消费者组的重新平衡。

在您的用例中,您希望在进行后续轮询之前等待 >51 分钟(重试时间 + 实际处理时间)。因此,您需要将此属性增加到足够的值(>51 分钟),以便 kafka 集群不会假设消费者已经死亡。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...