问题描述
Flux.range(1,100).delayElements(Duration.ofSeconds(10)).subscribe(i-> System.out.println(i));
当 Flux emiter 到 10 时,我想延迟元素到 1 分钟到这个订阅 背景是当我从 mongoDB 读取一些数据并写入 Elasticsearch 时, 但我想动态控制读取速度,又不想耗尽mongodb资源。
Flux<List<Document>> readFromMongoDB = getFromMongoDB();
product_2014.subscribe(new BaseSubscriber<List<Document>>() {
int counter;
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
@SneakyThrows
@Override
protected void hookOnNext(List<Document> value) {
Thread.sleep(1000);
if (counter == 1) {
counter = 0;
}
else {
counter++;
}
upstream().request(0);
upstream().cancel();
log.info("aaaaaaaaaaa");
}
});
ParallelFlux<List<Document>> getFromMongoDB(String product,int size,MongoDatabase mongoDatabase,int parallel,Duration duration) {
Publisher<Long> publisher = mongoDatabase.getCollection(product)
.countDocuments();
Mono<Long> count = Mono.from(publisher);
return count.flatMapMany(l -> {
log.info("split counter");
return Flux.range(0,(int) (l / size) + 1);
})
.log().doOnSubscribe(subscription -> {
log.info("doOnSubscribe");
})
.parallel(parallel,1)
.runOn(scheduler)
.doOnNext(integer -> log.info("get page = {}",integer))
.concatMap(page -> {
log.info("page in {}",page);
FindPublisher<Document> limit =
mongoDatabase.getCollection(product)
.find(Document.class)
.skip(page * size)
.limit(size);
Mono<List<Document>> listDocument = Flux.from(limit)
.publishOn(scheduler)
.collectList()
.doOnNext(list -> {
log.info("{} in list",page);
});
return listDocument;
});
}
为什么我可以取消订阅。而且 Flux 仍然发出页面元素? 我该怎么办? 想并行从mongodb读取,动态控制读取速度。
解决方法
我只看到这样:
Flux.range(1,100)
.flatMap(el -> {
Mono<Integer> elMono = Mono.just(el);
if (el <= 10) {
return elMono.delayElement(Duration.ofMinutes(1));
}
else {
return elMono.delayElement(Duration.ofSeconds(10));
}
})
.subscribe(i-> System.out.println(i));
您检查每个元素并决定如何延迟这个元素。