Hazelcast Jet 与 Java 8 流

问题描述

我正在尝试使用 Hazelcast Jet 根据最大​​日期对 List<Map<String,Object>> 对象进行排序。

这是我的 java 8 代码:

public static List<Map<String,Object>> extractDate1(List<Map<String,Object>> data) {
    return data.stream().map(value -> new Object() {
        Map<String,Object> theMap = value;
        LocalDate date = extractDate(value);
    }).sorted(Comparator.comparing(obj -> obj.date)).map(obj -> obj.theMap).collect(Collectors.toList());
}

public static LocalDate extractDate(Map<String,Object> value) {
    DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("dd-MM-yyyy");
    DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d-MM-yyyy");
    return LocalDate.parse(LocalDate.parse(value.get("effectiveDate").toString(),formatter2).format(formatter1),formatter);
}

上面的java 8代码将地图对象从低到高排序:

下面是我试图提取的 Jet 代码,也给出了正确的输出。但我只想利用hazelcast jet 聚合/滚动功能

// fetching jsonb type data from db
BatchStage<Object> jobJson = dbValue
        // this model holds the string json value
        // converting json data to Map object
        .map(model -> JsonUtil.mapFrom(model.getJosnValue())
    .filter(map -> map.size() != 0)
    .map(map -> {
            // each json/map object will be having an array and again an array will I have multiple json objects in the
            // I'm filtering json objects based on max date 
      List<Map<String,Object>> extractedDateValue;
            if (map.containsKey("records")) {
         //Here I'm calling external function (above java 8 code)
                 extractedDateValue = extractMapBasedOnMax(
                        (List<Map<String,Object>>) map.get("records"));
            }
                
            return extractedDateValue.get(extractedDateValue.size() - 1);
        });

JSON 数据示例:

{
    "id": "01","records": [{
        "location": "xyz1","effectiveDate": "02-03-2021"
    },{
        "location": "xyz2","effectiveDate": "02-04-2021"
    }]
}

预期输出:

{
  "location": "xyz2","effectiveDate": "02-04-2021"
}

是否可以通过 Hazelcast Jet 滚动聚合来实现这一点?或者任何建议都会有所帮助..谢谢

解决方法

考虑 flatMapping 管道并使用 topN 找到最大值。 flatMap 会将每个 JSON 结构转换为一系列 [id,location,EffectiveDate] 记录。有关代码示例,请参阅 flatMap 的文档。

不清楚您是要在整个集合中查找最大元素还是每个 id 的最大元素。添加 groupingKey 会找到每个 ID 的最大值。

“元代码”中的管道形状:

source // stream of JSON structures
.flatMap // stream [id,effectiveDate]
.groupingKey // for maximum per id,remove for global max
.aggregate(AggregateOpperations.topN) // finds max 
.sink;

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...