如何调整mongodb的读取速度?

问题描述

        Flux.range(1,100).delayElements(Duration.ofSeconds(10)).subscribe(i-> System.out.println(i));

当 Flux emiter 到 10 时,我想延迟元素到 1 分钟到这个订阅 背景是当我从 mongoDB 读取一些数据并写入 Elasticsearch 时, 但我想动态控制读取速度,又不想耗尽mongodb资源。

Flux<List<Document>> readFromMongoDB = getFromMongoDB();

product_2014.subscribe(new BaseSubscriber<List<Document>>() {
            int counter;

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(1);
            }

            @SneakyThrows
            @Override
            protected void hookOnNext(List<Document> value) {
                Thread.sleep(1000);
                if (counter == 1) {
                    counter = 0;
                }
                else {
                    counter++;
                }
                upstream().request(0);
                upstream().cancel();
                log.info("aaaaaaaaaaa");
            }
        });
ParallelFlux<List<Document>> getFromMongoDB(String product,int size,MongoDatabase mongoDatabase,int parallel,Duration duration) {
        Publisher<Long> publisher = mongoDatabase.getCollection(product)
                                                 .countDocuments();
        Mono<Long> count = Mono.from(publisher);
        return count.flatMapMany(l -> {
            log.info("split counter");
            return Flux.range(0,(int) (l / size) + 1);
        })
                    .log().doOnSubscribe(subscription -> {
                        log.info("doOnSubscribe");
                    })
                    .parallel(parallel,1)
                    .runOn(scheduler)
                    .doOnNext(integer -> log.info("get page  = {}",integer))
                    .concatMap(page -> {
                        log.info("page in {}",page);
                        FindPublisher<Document> limit =
                                mongoDatabase.getCollection(product)
                                             .find(Document.class)
                                             .skip(page * size)
                                             .limit(size);
                        Mono<List<Document>> listDocument = Flux.from(limit)
                                                                .publishOn(scheduler)
                                                                .collectList()
                                                                .doOnNext(list -> {
                                                                    log.info("{} in list",page);
                                                                });
                        return listDocument;
                    });
    }

为什么我可以取消订阅。而且 Flux 仍然发出页面元素? 我该怎么办? 想并行从mongodb读取,动态控制读取速度。

解决方法

我只看到这样:

       Flux.range(1,100)
                .flatMap(el -> {
                    Mono<Integer> elMono = Mono.just(el);
                    if (el <= 10) {
                        return elMono.delayElement(Duration.ofMinutes(1));
                    }
                    else {
                        return elMono.delayElement(Duration.ofSeconds(10));
                    }
                })
                .subscribe(i-> System.out.println(i));

您检查每个元素并决定如何延迟这个元素。