如何在 Java 中制作异步生产者/源?

问题描述

我有一个需要方法处理的任务列表,这个过程需要很长时间,所以我希望列表中的任务在异步模式下被一个一个处理,并将结果作为一个异步流,因此下游处理不需要等待整个任务列表完成:

AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}

在网上简单搜索了一下,发现 RxJava 可以处理异步流数据,但是介绍中好像解释了如何创建异步数据流。那么如何在 Java 中创建异步生产者/源?

解决方法

您可以创建异步 Observable,它会在给定任务的计算完成后立即发出值。为此,您将需要 flatMap 运算符。在一个简化的例子中,这看起来像:

static Observable<String> methodA(List<String> tasks) {
     return Observable.from(tasks)
            .flatMap(t -> Observable.just(t)
                    .map(t1 -> longRunningTask(t1))
                    .subscribeOn(Schedulers.io())
            );

}

static String longRunningTask(String arg) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return arg;
}

您将您的任务映射到 Observable 并使用 subscribeOn,以便当某些内容订阅它们时,订阅发生在不同的线程上。 flatMap 运算符一次订阅所有这些 Observables 并在它们准备好后立即发出值。计算是异步的,因为订阅发生在来自 Scedulers.io 池的不同线程中。