Reactor-如何将以id为键的找到和未找到的实体收集到地图中?

问题描述

想象一下,我将用户ID列表上传到服务器中以查找它们;我想知道哪些ID在数据库中找不到,所以我会知道我的db数据本地缓存,或者我的真实数据已损坏。缺少ID可用作UI和日志中的反馈。

我想构造一个映射,其中键作为输入ID,值作为在缓存/数据库中找到的实体用户。如果未找到,则值为null。但是,如果ReactiveCrudRepository不返回任何内容,则findById(id)将返回Mono.empty(),并且因此映射将不包括未找到的实体。在测试中我得到了错误

Expected: is <true>
     but: was <false>
java.lang.AssertionError: 
Expected: is <true>
     but: was <false>

我正在做

return Flux.fromIterable(ids)
            .concatMap(repository::findById) // <--------------- this step only gets found entities
            .collectMap(Customer::getId); // <------------------ so here I get only existent elements(in test,a map with 2 elements)

关键是要知道找到了什么ID,没有找到了什么。如何在Reactor 3中以其他方式执行此操作?

测试:

    @Test
    void findAllByIdAndMissing() {
        // given
        Customer customer1 = Customer.builder().id("id1").build();
        Customer customer2 = Customer.builder().id("id2").build();
        // given mocked
        when(cache.getAllPresent(ArgumentMatchers.any())).thenReturn(Collections.emptyMap());
        when(repository.findById("id1")).thenReturn(Mono.just(customer1));
        when(repository.findById("id2")).thenReturn(Mono.just(customer2));
        when(repository.findById("id3")).thenReturn(Mono.empty());

        // when
        Map<String,Customer> actual = service.findAllByIdAndMissing(Set.of("id3","id2","id1")).block();

        // then
        assertthat(actual.keySet().contains("id1"),is(true));
        assertthat(actual.keySet().contains("id2"),is(true));
        assertthat(actual.keySet().contains("id3"),is(true));
    }

如果我添加log()

:08:03.072 [Test worker] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
13:08:03.078 [Test worker] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
13:08:03.080 [Test worker] INFO reactor.Flux.FlatMap.2 - request(unbounded)
13:08:03.081 [Test worker] INFO reactor.Flux.Iterable.1 - | request(256)
13:08:03.082 [Test worker] INFO reactor.Flux.Iterable.1 - | onNext(id1)
13:08:03.084 [Test worker] INFO reactor.Flux.FlatMap.2 - onNext(Customer(super=Auditable(super=Customer@196b1,createdAt=null,createdBy=null,modifiedAt=null,modifiedBy=null),schemaVersion=1,internalVersion=null,id=id1))
13:08:03.145 [Test worker] INFO reactor.Flux.Iterable.1 - | onNext(id2)
13:08:03.146 [Test worker] INFO reactor.Flux.FlatMap.2 - onNext(Customer(super=Auditable(super=Customer@196b2,id=id2))
13:08:03.147 [Test worker] INFO reactor.Flux.Iterable.1 - | onNext(id3)
13:08:03.148 [Test worker] INFO reactor.Flux.Iterable.1 - | onComplete()
13:08:03.149 [Test worker] INFO reactor.Flux.FlatMap.2 - onComplete()

解决方法

不确定是否有更惯用的方法来做到这一点,但是您可以通过一个辅助类轻松实现此目的:

Flux.fromIterable(ids)
    .concatMap(id -> repository.findById(id).map(Result::found).defaultIfEmpty(Result.notFound(id)))
    .collectMap(Result::getId,Result::getCustomer);


@Data
@AllArgsConstructor(access = AccessLevel.PRIVATE)
class Result {
    private final String id;
    private final Customer customer;

    public static Result found(Customer customer) {
        return new Result(customer.getId(),customer);
    }

    public static Result notFound(String id) {
        return new Result(id,null);
    }
}