在 reactor-kafka 中寻求立即生效?

问题描述

尝试实现一个重试记录处理器,在出现瞬时故障时,它会返回到最后一条记录的偏移量,而不是在不确定的时间内进入内部重试循环,并且如果错误持续很长时间,就会冒消费者会话超时和重新平衡的风险够了。

        kafkaTemplate
            .receive()
...
            .concatMap(record -> {
                var postResponses = mapper.transform(record.value())
                                          .stream()
                                          .map(this::post)
                                          .toArray(Mono[]::new);
                return Mono.when(postResponses)
                           .doOnSuccess(__ -> record.receiverOffset().ackNowledge())
                           .onErrorResume(ex ->
                               kafkaTemplate.seek(record.receiverOffset().topicPartition(),record.offset())
                                            .then(Mono.delay(someRetryinterval).then())
                           );
            },0) // no prefetch!
            .subscribe();

将最大轮询记录设置为 1,在此代码开始之前,主题中有两条记录, Mono。当一直失败时,我看到它反复寻找 0 和 1,而不是只读取第一条记录,直到成功。

我错过了什么吗? 如何在不切换到阻塞 API 的情况下,完全序列化轮询和查找调用以根据需要重试次数来处理记录?

期待在 RK 中使用等效的 @KafkaListener 和 SeekToCurrentErrorHandler,但发现并非如此

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)