问题描述
我正在尝试复制我们在生产中面临的一个场景,其中包含并行性和等待某个部分完成的混合,这取决于响应。
下面是代码的副本。
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static java.lang.Integer.sum;
public class Traditional {
public static void main(String[] args) throws ExecutionException,InterruptedException {
Traditional Traditional = new Traditional();
Instant start = Instant.Now();
// Traditional.executeMethod();
Instant end = Instant.Now();
System.out.println("TiMetaken to execute sync :: " + Duration.between(start,end));
start = Instant.Now();
Traditional.executeMethodAsync();
end = Instant.Now();
System.out.println("TiMetaken to execute async :: " + Duration.between(start,end));
start = Instant.Now();
Traditional.executeMethodReactive();
end = Instant.Now();
System.out.println("TiMetaken to execute Reactively:: " + Duration.between(start,end));
}
private Integer executeMethodReactive() {
Scheduler scheduler = Schedulers.parallel();
Mono<Integer> a = Mono.just(firstMethod());
Mono<Integer> b = Mono.just(secondMethod());
Mono<Integer> c = Mono.just(thirdMethod());
Mono<Integer> externalMono = Mono.zip(a,b,c).subscribeOn(scheduler).flatMap(data -> {
Integer total = sum(data.getT1(),data.getT2());
Integer diff = diff(data.getT3(),data.getT1());
return Mono.zip(Mono.just(divide(total,diff)).subscribeOn(scheduler),Mono.just(multiply(total,diff)).subscribeOn(scheduler))
.flatMap(innerzip -> Mono.just(innerzip.getT1() + innerzip.getT2()));
});
Integer value = externalMono.block(Duration.ofMinutes(1));
System.out.println(value);
return value;
}
private int executeMethod() {
int a = firstMethod();
int b = secondMethod();
Integer c = thirdMethod();
Integer total = sum(a,b);
Integer diff = diff(c,a);
Integer d = divide(total,diff);
Integer multi = multiply(total,diff);
int value = d + multi;
System.out.println(value);
return value;
}
private Integer executeMethodAsync() throws ExecutionException,InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(this::firstMethod);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(this::secondMethod);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(this::thirdMethod);
CompletableFuture.allOf(future1,future2,future3).join();
Integer total = sum(future1.get(),future2.get());
Integer diff = diff(future3.get(),future1.get());
CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> divide(total,diff));
CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> multiply(total,diff));
Function<Void,Integer> function = (voidValue) -> {
try {
return future4.get() + future5.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
};
CompletableFuture<Integer> finalTotal = CompletableFuture.allOf(future4,future5).thenApplyAsync(function);
System.out.println("Final Total :: " + finalTotal.get());
return finalTotal.get();
}
private int divide(Integer total,Integer diff) {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("divide");
return total / diff;
}
private int multiply(Integer total,Integer diff) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("multiply");
return total * diff;
}
private Integer diff(Integer a,Integer b) {
return a - b;
}
private int firstMethod() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("1");
return 1;
}
private int secondMethod() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("2");
return 2;
}
private int thirdMethod() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printstacktrace();
}
System.out.println("3");
return 3;
}
}
程序的顺序执行需要 18 秒,异步执行需要 11 秒,我试图在反应式编程中实现相同的目标。如何实现这一目标?
更新
根据martin提供的线索,重新编写如下方法,运行正常
private Integer executeMethodReactive() {
Scheduler scheduler = Schedulers.parallel();
Mono<Integer> externalMono = Mono.zip(Mono.fromCallable(this::firstMethod).log("first").subscribeOn(scheduler),Mono.fromCallable(this::secondMethod).log("second").subscribeOn(scheduler),Mono.fromCallable(this::thirdMethod).log("third").subscribeOn(scheduler)).flatMap(data -> {
Integer total = sum(data.getT1(),data.getT1());
return Mono.zip(Mono.fromCallable(() -> divide(total,diff)).log("divide").subscribeOn(scheduler),Mono.fromCallable(() -> multiply(total,diff)).log("multiply").subscribeOn(scheduler))
.flatMap(inner -> Mono.just(inner.getT1() + inner.getT2()));
});
return externalMono.block();
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)