问题描述
我需要计算正在流式传输的对象列表的平均值。 这些对象具有:
ClassX.id
ClassX.name
ClassX.value
ClassX.startTime
ClassX.endTime
对象必须按ClassX.name
分组,并使用ClassX.value
计算平均值。
流式传输的每个对象代表交易的开始或结束。
开始交易具有ClassX.endTime == null
。
结束交易具有ClassX.startTime == null
。
最终交易中有ClassX.name == null
要汇总的值在 start 对象之内,但是只有在流还处理转换的相应结束对象的情况下,也必须将其汇总为平均值。
这是我到目前为止所拥有的(基于 Andreas 的建议):
List<ClassX> classXList = ...
Map<String,Double> average = classXListStrings.stream()
.map(ClassX::new) //convert to ClassX(the input list is actually String)
.filter(x -> x.getName() != null) //avoid null entries for getName
.collect(Collectors.groupingBy(ClassX::getName,Collectors.toList()))
.entrySet().stream()
// skip group if no end transaction exists
.filter(e -> e.getValue().stream().anyMatch(x -> x.getStartTime() != null))
.collect(Collectors.toMap(Entry::getKey,e -> e.getValue().stream()
// only average values of start transactions
.filter(x -> x.getEndTime() == null)
.collect(Collectors.averagingDouble(ClassX::getValue))
));
是否有一种方法可以存储流式传输到数据结构中的对象,然后仅在基于过滤器流式传输一对对象的开始/结束事务时才聚合值?
解决方法
很难将流中的一个对象与稍后出现的另一个对象相关联。
一个解决方案在列表中运行了两次:首先,找到最终交易,将它们收集到一组中。然后,您再次处理列表,计算平均值。
List<ClassX> inputList = ...
Set<String> endSet = inputList.stream()
.filter(o -> o.endTime != null)
.map(o -> o.id)
.collect(Collectors.toSet());
Map<String,Double> average = inputList.stream()
.filter(o -> o.startTime != null && endSet.contains(o.id))
.collect(Collectors.groupingBy(
o -> o.name,Collectors.averagingDouble(o -> o.value)));
,
您可以这样做:
List<ClassX> classXList = ...
Map<String,Double> average = classXList.stream()
.collect(Collectors.groupingBy(ClassX::getName,Collectors.toList()))
.entrySet().stream()
// skip group if no end transaction exists
.filter(e -> e.getValue().stream().anyMatch(x -> x.getStartTime() == null))
.collect(Collectors.toMap(Entry::getKey,e -> e.getValue().stream()
// only average values of start transactions
.filter(x -> x.getEndTime() == null)
.collect(Collectors.averagingDouble(ClassX::getValue))
));