问题描述
我有一个用例,我想无限地继续接收来自 Kafka 的记录,并使用 processRecord(String record)
对记录进行一些处理,它可以抛出 RuntimeException
。我想重试多次(比如 5 次),如果在 5 次重试之前成功,想要手动提交偏移量并继续下一条记录,如果不是那么想要(记录它 --> 提交偏移量)然后继续接下来的记录。我有一个代码,但似乎不能正常工作。希望得到一些帮助。
public class MyClass {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final ReceiverOptions<String,String> receiverOptions = getReceiverOptions();
public void consumeRecords() {
RetryBackoffSpec retrySpec = Retry.backoff(5,Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
System.out.println(record.value());
processRecord(record.value());
})
.doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
.onErrorContinue((e,r) -> {
System.out.println(atomicInteger.incrementAndGet());
System.out.println("Record: " + r);
System.out.println("Error: " + e);
})
.retrywhen(retrySpec)
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
我收到的输出是:
some message
1
Record: ConsumerRecord(topic = my-topic,partition = 0,leaderEpoch = null,offset = 1,CreateTime = 1620062099518,serialized key size = -1,serialized value size = 12,headers = RecordHeaders(headers = [],isReadOnly = false),key = null,value = some message)
Error: java.lang.RuntimeException: Throwing exception!
second message
1
Record: ConsumerRecord(topic = my-topic,partition = 1,offset = 2,CreateTime = 1620062166706,serialized value size = 14,value = second message)
Error: java.lang.RuntimeException: Throwing exception!
它没有重试 5 次,而且 AtomicInteger 没有为第二条记录更新。
我想要实现的是:
count = 0
while (count < 5) {
if (exception) count++;
else break_and_continue_with_next_record
}
if (count == 5) log_failure_and_continue_with_next_record
解决方法
onErrorResume()
优于 onErrorContinue()
。
问题是你不能在那里提交偏移量,因为接收器在那个时候不再活动。
这对我有用...
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String,String> receiverOptions) {
RetryBackoffSpec retrySpec = Retry.backoff(5,Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver<String,String> receiver = KafkaReceiver.create(receiverOptions);
AtomicReference<ReceiverRecord<?,?>> failed = new AtomicReference<>();
receiver.receive()
.subscribeOn(Schedulers.single())
.doOnNext(record -> {
System.out.println(record.value() + "@" + record.offset());
if (failed.get() != null) {
System.out.println("Committing failed record offset " + record.value()
+ "@" + record.offset());
record.receiverOffset().acknowledge();
failed.set(null);
}
else {
atomicInteger.set(0);
try {
processRecord(record.value());
record.receiverOffset().acknowledge();
}
catch (Exception e) {
throw new ReceiverRecordException(record,e);
}
}
})
.doOnError(ex -> atomicInteger.incrementAndGet())
.retryWhen(retrySpec)
.onErrorResume(e -> {
ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
ReceiverRecord<?,?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record,Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
编辑
这是完整的应用程序...
@SpringBootApplication
public class So67373188Application {
private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(So67373188Application.class,args);
Thread.sleep(120_000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner2() {
return args -> {
SenderOptions<String,String> so = SenderOptions.create(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092",ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class,ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class));
KafkaSender<String,String> sender = KafkaSender.create(so);
Disposable subscribed = sender.send(Flux.just(pr("foo"),pr("bar"),pr("fail"),pr("baz")))
.subscribe(result -> {
System.out.println(result.recordMetadata());
});
Thread.sleep(5000);
subscribed.dispose();
};
}
@Bean
public ApplicationRunner runner3(KafkaOperations<String,String> template) {
return args -> {
DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
ReceiverOptions<String,String> ro = ReceiverOptions.<String,String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,ConsumerConfig.GROUP_ID_CONFIG,"so67373188",ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1,ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("so67373188"));
consumeRecords(ro);
};
}
private SenderRecord<String,String,String> pr(String value) {
return SenderRecord.create("so67373188",null,value,value + ".corr");
}
private final AtomicInteger atomicInteger = new AtomicInteger();
public void consumeRecords(ReceiverOptions<String,?> record = ex.getRecord();
System.out.println("Retries exhausted for " + record.value()
+ "@" + record.offset());
failed.set(record);
return Mono.empty();
})
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
if (record.equals("fail")) {
throw new RuntimeException("Throwing exception!");
}
}
}
@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record,Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
结果:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__,| / / / /
=========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m [2m (v2.4.5)[0;39m
so67373188-0@16
so67373188-0@17
so67373188-0@18
so67373188-0@19
foo@16
bar@17
fail@18
fail@18
fail@18
fail@18
fail@18
fail@18
Retries exhausted for fail@18
fail@18
Committing failed record offset fail@18
baz@19