问题描述
我是反应式编程的新手。需要帮助以了解行为。
我想实现什么?
我想创建50个ID应为递增顺序的条目。如果db中存在ID为1的条目,则应创建ID为2的条目。
我当前的实现如下:
// create entry 50 times
void createEntries() {
LOGGER.info("going to create 50 entries);
Flux.range(1,50)
.flatMap(i -> createEntry(5))
.subscribe();
}
//method to create an entry in db with incremental id
private Mono<Integer> createEntry(long retryInterval) {
return (customrepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys>
.map(entry -> entry.getEntryId())
.sort()
//get last existing entry id
.last(0)
//try to create the entry with new incremented id
.flatMap(id -> createEntry(id + 1,retryInterval));
}
private Mono<? extends Integer> createEntry(int newEntryId,long retryInterval) {
return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
.doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ",newEntryId,applied)) //--> Why this is called multiple times??
.flatMap(applied -> !applied
//applied false shows id already exists,so try again recursively with new incremented id
? createEntry(newEntryId + 1,retryInterval)
: Mono.just(newEntryId))
.doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ",e));
.retrywhen(Retry.anyOf(RuntimeException.class)
.exponentialBackoff(Duration.ofSeconds(retryInterval),Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception
}
上述实现为我带来了意外的行为,信息记录器“成功创建ID为:” 的对象被多次调用相同的ID。但是,我希望仅一次调用它。
注意:即使我删除了retrywhen
,该行为也保持不变。
解决方法
最后,我在代码中找出了问题。问题出在下面的代码片段中
// create entry 50 times
void createEntries() {
LOGGER.info("going to create 50 entries);
Flux.range(1,50)
.flatMap(i -> createEntry(5))
.subscribe();
}
这是在saveEntry(newEntryId)
完成之前调用方法50次。我通过使用repeat
API来解决此问题,如下所示:
// create entry 50 times
void createEntries() {
LOGGER.info("going to create 50 entries);
createEntry(5).subscribe();
}
//method to create an entry in db with incremental id
private Flux<? extends Integer> createEntry(long retryInterval) {
return (customRepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys>
.map(entry -> entry.getEntryId())
.sort()
//get last existing entry id
.last(0)
//try to create the entry with new incremented id
.flatMap(id -> createEntry(id + 1,retryInterval))
.repeat(49); //-->This fixes my issue will only be invoked 49 times again onComplete(). And hence will create 50 entries
}
private Mono<? extends Integer> createEntry(int newEntryId,long retryInterval) {
return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
.doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ",newEntryId,applied))
.flatMap(applied -> !applied
//applied false shows id already exists,so try again recursively with new incremented id
? createEntry(newEntryId + 1,retryInterval)
: Mono.just(newEntryId))
.doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ",e));
.retryWhen(Retry.anyOf(RuntimeException.class)
.exponentialBackoff(Duration.ofSeconds(retryInterval),Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception
}