问题描述
public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addobserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}
@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId,callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites",e);
}
return future;
}
现在它返回一个CompletableFuture 。然后我称它为使用Flux做一些处理器。
public Flux<Product> getRecommend(Long userId) throws InterruptedException,ExecutionException,TimeoutException {
// do not like it
List<Long> recommendList = wrapper.getRecommend(userId).get(2,TimeUnit.SECONDS);
System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
但是,我想从getFavourites
方法中获取助焊剂,并且可以在getRecommend
方法中使用它。
或者,您可以推荐一个Flux API
,我可以将List<Long> recommendList
转换为Flux<Long> recommendFlux
。
解决方法
要将CompletableFuture<List<T>>
转换为Flux<T>
,可以将Mono#fromFuture
与Mono#flatMapMany
结合使用:
var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L,2L,3L,4L,5L),CompletableFuture.delayedExecutor(3,TimeUnit.SECONDS));
Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
在回调中异步接收的 List<T>
也可以转换为Flux<T>
,而无需使用CompletableFuture
。
您可以将Mono#create
与Mono#flatMapMany
直接使用:
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query",callback);
}).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
或者简单地将Flux#create
与多个发射一起使用:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query",callback);
});
flux.subscribe(System.out::println);