问题描述
尝试实现一个重试记录处理器,在出现瞬时故障时,它会返回到最后一条记录的偏移量,而不是在不确定的时间内进入内部重试循环,并且如果错误持续很长时间,就会冒消费者会话超时和重新平衡的风险够了。
kafkaTemplate
.receive()
...
.concatMap(record -> {
var postResponses = mapper.transform(record.value())
.stream()
.map(this::post)
.toArray(Mono[]::new);
return Mono.when(postResponses)
.doOnSuccess(__ -> record.receiverOffset().ackNowledge())
.onErrorResume(ex ->
kafkaTemplate.seek(record.receiverOffset().topicPartition(),record.offset())
.then(Mono.delay(someRetryinterval).then())
);
},0) // no prefetch!
.subscribe();
将最大轮询记录设置为 1,在此代码开始之前,主题中有两条记录, Mono。当一直失败时,我看到它反复寻找 0 和 1,而不是只读取第一条记录,直到成功。
我错过了什么吗? 如何在不切换到阻塞 API 的情况下,完全序列化轮询和查找调用以根据需要重试次数来处理记录?
期待在 RK 中使用等效的 @KafkaListener 和 SeekToCurrentErrorHandler,但发现并非如此
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)