问题描述
sales["NEW"]=np.where(((sales.Status=='Done') |
(sales.Status=='out') |
(sales.Status=='in') |
(sales.sumPaid>dailyAmount) |
(sales.days<14)),'No','Yes')
validationObject 被传递到每个验证中,如果存在任何错误,则将附加到其中,最后将返回对象。 目前它需要更多时间,因为执行是顺序的。
我可以并行执行验证并合并每个线程中的响应并发回吗?
是否可以通过 Completable Futures 实现这一点?我找不到这方面的例子。 请哪位大神指点一二?
解决方法
是的,它可以使用 CompletableFuture
库执行。
我可以与您分享一个示例,尝试为您的问题陈述实现相同的示例。
-
创建将接受的 CompletableFuture 对象列表 ValidationObject 类型作为输入。
List<CompletableFuture<ValidationObject>> completableFutureList = new ArrayList<>();
-
将验证函数添加到列表中。
completableFutureList.add(validation1(validationObj)); completableFutureList.add(validation2(validationObj));
-
验证方法必须返回相同的类型(在我的示例中:CompletableFuture)。 CompletableFuture 不接受 void 类型,至少必须返回某种类型。如果你的方法没有返回任何东西,那么至少返回一些字符串,例如:CompletableFuture。这里的“asyncExecutor”只不过是我们在 ThreadPoolTaskExecutor 配置中分配的自定义@bean 名称。
此链接中的更多详细信息:https://www.oodlestechnologies.com/blogs/threadpooltaskexecutor-configuration-with-spring-boot/
@Async("asyncExecutor")
public CompletableFuture<String> validation1(ValidationObject validation)
{
//your code here
}
-
然后将各个异步任务连接在一起,以便执行器只有在收到所有线程的结果后才能继续执行代码的其他部分。如果您不加入,则其他代码将被执行,并且每个线程中的验证方法将异步执行。
for (CompletableFuture<ValidationObject> completableFuture : completableFutureList) { completableFuture.join(); }
您可以选择 java8 Streams's parallel processing
。为了模拟您的场景,我创建了一些随机验证函数,对数据进行某种处理。
import java.util.stream.Stream;
public class StackOverFlow_65531407 {
public static void main(String[] args) {
StackOverFlow_65531407 stack = new StackOverFlow_65531407();
ValidationObject vObj = new ValidationObject(1l,"name","Password","role");
Long t1 = System.currentTimeMillis();
stack.validate(vObj);
Long t2 = System.currentTimeMillis();
stack.validate_parallel(vObj);
Long t3 = System.currentTimeMillis();
System.out.println("---------------------------------------");
System.out.println("time in non-parallel execution::" + (t2 - t1) + " milliseconds");
System.out.println("time in parallel execution::" + (t3 - t2) + " milliseconds");
System.out.println("---------------------------------------");
}
public ValidationObject validate(ValidationObject validationObj) {
v1(validationObj);
v2(validationObj);
v3(validationObj);
v4(validationObj);
return validationObj;
}
public ValidationObject validate_parallel(ValidationObject validationObj) {
Stream.of(validationObj)
.parallel()
.map(this::v1)
.map(this::v2)
.map(this::v3)
.map(this::v4);
return validationObj;
}
private ValidationObject v1(ValidationObject validationObj) {
//do validation on id
if (validationObj.getId() != null) {
System.out.println(validationObj.getId());
for (int i = 0; i < 10000; i++) {
System.out.println("\tPrinted " + i);
}
}
return validationObj;
}
private ValidationObject v2(ValidationObject validationObj) {
//do validation on name
if (validationObj.getName() != null) {
System.out.println(validationObj.getName());
for (int i = 0; i < 10000; i++) {
System.out.println("\tPrinted " + i);
}
}
return validationObj;
}
private ValidationObject v3(ValidationObject validationObj) {
//do validation on password
if (validationObj.getPassword() != null) {
System.out.println(validationObj.getPassword());
for (int i = 0; i < 10000; i++) {
System.out.println("\tPrinted " + i);
}
}
return validationObj;
}
private ValidationObject v4(ValidationObject validationObj) {
//do validation on role
if (validationObj.getRole() != null) {
System.out.println(validationObj.getPassword());
for (int i = 0; i < 1000; i++) {
System.out.println("\tPrinted " + i);
}
}
return validationObj;
}
}
对于较少数量的输入(例如打印到 100 或数千),流并行会很慢。但在更大的情况下,流式输出运行顺序处理。
我的输出为:
----
---
Printed 987
Printed 988
Printed 989
Printed 990
Printed 991
Printed 992
Printed 993
Printed 994
Printed 995
Printed 996
Printed 997
Printed 998
Printed 999
---------------------------------------
time in non-parallel execution::249 milliseconds
time in parallel execution::96 milliseconds
---------------------------------------
您可以看到并行流如何显着减少您的时间。但使用并行流也是一种固执的选择。