问题描述
我是 Kafka 的初学者。
我一直在尝试对 Kafka 消费者中的失败记录实施指数重试。重试 4 次后,消费者需要关闭。重试应在 1 分钟、5 分钟、15 分钟和 30 分钟后再次进行。经过所有这些尝试,如果重试不成功,那么我需要关闭使用者。
我已经做了以下来实现它。但 5 分钟后(max.poll.interval),消费者重新平衡。如何完成所有重试尝试(失败时尝试 5 次),然后关闭消费者?
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(60000);
backOffPolicy.setMultiplier(5);
backOffPolicy.setMaxInterval(900000);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
private RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>,Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class,false);
exceptionMap.put(RecoverableDataAccessException.class,true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(4,exceptionMap,true);
return simpleRetryPolicy;
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)