如何在反应式编程的后续步骤中实现并行和等待响应

问题描述

我正在尝试复制我们在生产中面临的一个场景,其中包含并行性和等待某个部分完成的混合,这取决于响应。

下面是代码的副本。

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.lang.Integer.sum;

public class Traditional {

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        Traditional Traditional = new Traditional();
        Instant start = Instant.Now();
//        Traditional.executeMethod();
        Instant end = Instant.Now();
        System.out.println("TiMetaken to execute sync :: " + Duration.between(start,end));
        start = Instant.Now();
        Traditional.executeMethodAsync();
        end = Instant.Now();
        System.out.println("TiMetaken to execute async :: " + Duration.between(start,end));
        start = Instant.Now();
        Traditional.executeMethodReactive();
        end = Instant.Now();
        System.out.println("TiMetaken to execute Reactively:: " + Duration.between(start,end));
    }

    private Integer executeMethodReactive() {
        Scheduler scheduler = Schedulers.parallel();
        Mono<Integer> a = Mono.just(firstMethod());
        Mono<Integer> b = Mono.just(secondMethod());
        Mono<Integer> c = Mono.just(thirdMethod());

        Mono<Integer> externalMono = Mono.zip(a,b,c).subscribeOn(scheduler).flatMap(data -> {
            Integer total = sum(data.getT1(),data.getT2());
            Integer diff = diff(data.getT3(),data.getT1());

            return Mono.zip(Mono.just(divide(total,diff)).subscribeOn(scheduler),Mono.just(multiply(total,diff)).subscribeOn(scheduler))
                    .flatMap(innerzip -> Mono.just(innerzip.getT1() + innerzip.getT2()));
        });

        Integer value = externalMono.block(Duration.ofMinutes(1));
        System.out.println(value);
        return value;

    }

    private int executeMethod() {
        int a = firstMethod();
        int b = secondMethod();
        Integer c = thirdMethod();

        Integer total = sum(a,b);

        Integer diff = diff(c,a);

        Integer d = divide(total,diff);

        Integer multi = multiply(total,diff);

        int value = d + multi;
        System.out.println(value);
        return value;
    }

    private Integer executeMethodAsync() throws ExecutionException,InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(this::firstMethod);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(this::secondMethod);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(this::thirdMethod);
        CompletableFuture.allOf(future1,future2,future3).join();

        Integer total = sum(future1.get(),future2.get());

        Integer diff = diff(future3.get(),future1.get());

        CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> divide(total,diff));
        CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> multiply(total,diff));

        Function<Void,Integer> function = (voidValue) -> {
            try {
                return future4.get() + future5.get();
            } catch (InterruptedException | ExecutionException e) {
                return null;
            }
        };
        CompletableFuture<Integer> finalTotal = CompletableFuture.allOf(future4,future5).thenApplyAsync(function);
        System.out.println("Final Total :: " + finalTotal.get());
        return finalTotal.get();
    }

    private int divide(Integer total,Integer diff) {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        System.out.println("divide");
        return total / diff;
    }

    private int multiply(Integer total,Integer diff) {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        System.out.println("multiply");
        return total * diff;
    }

    private Integer diff(Integer a,Integer b) {
        return a - b;
    }

    private int firstMethod() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        System.out.println("1");
        return 1;
    }

    private int secondMethod() {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        System.out.println("2");
        return 2;
    }

    private int thirdMethod() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        System.out.println("3");
        return 3;
    }
  }

程序的顺序执行需要 18 秒,异步执行需要 11 秒,我试图在反应式编程中实现相同的目标。如何实现这一目标?

更新

根据martin提供的线索,重新编写如下方法,运行正常

private Integer executeMethodReactive() {

        Scheduler scheduler = Schedulers.parallel();

        Mono<Integer> externalMono = Mono.zip(Mono.fromCallable(this::firstMethod).log("first").subscribeOn(scheduler),Mono.fromCallable(this::secondMethod).log("second").subscribeOn(scheduler),Mono.fromCallable(this::thirdMethod).log("third").subscribeOn(scheduler)).flatMap(data -> {
            Integer total = sum(data.getT1(),data.getT1());

            return Mono.zip(Mono.fromCallable(() -> divide(total,diff)).log("divide").subscribeOn(scheduler),Mono.fromCallable(() -> multiply(total,diff)).log("multiply").subscribeOn(scheduler))
                    .flatMap(inner -> Mono.just(inner.getT1() + inner.getT2()));
        });

        return externalMono.block();
    }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)