问题描述
{
summary:Integer
uid:String
key:String
.....
}
我需要在某个时间范围内汇总摘要值,一旦达到特定数字,就将摘要和影响摘要的所有UID都刷新到数据库/日志文件中。
第一次刷新后,我要从内存中清除所有uid,然后立即刷新每个新项。
所以我尝试了这个聚合函数。
public class AggFunc implements AggregateFunction<Item,Acc,Tuple2<Integer,List<String>>>{
private static final long serialVersionUID = 1L;
@Override
public Acc createAccumulator() {
return new Acc());
}
@Override
public Acc add(Item value,Acc accumulator) {
accumulator.inc(value.getSummary());
accumulator.addUid(value.getUid);
return accumulator;
}
@Override
public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
List<String> newL = Lists.newArrayList(accumulator.getUids());
accumulator.setUids(Lists.newArrayList());
return Tuple2.of(accumulator.getSum(),newL);
}
@Override
public Acc merge(Acc a,Acc b) {
.....
}
}
在聚合过程函数中,我将列表刷新为状态,如果需要保存到数据库,则要清除状态并在状态中保存标志以指示状态。
但这似乎对我来说是歪曲的。而且我不确定这是否对我有效。
是否有更好的解决方案?
解决方法
使用功能丰富的状态进行处理。在您的状态下以及窗口触发刷新值时继续添加uid
。官方文档中的此页面有一个示例。
对于您而言,ListState
会很好地工作。
编辑:
以上解决方案适用于非窗口情况。对于窗口情况,只需使用带有可具有丰富窗口功能的apply函数的结块