Hazelcast jet flatMap 说:这些变换没有附加任何东西:[flat-map]

问题描述

我正在尝试过滤 BatchStage 但它不起作用它给了我以下错误:

BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
         bd.filter(k -> { 
             
             return true;
             
         }).writeTo(Sinks.logger());

我的代码如下,我做错了什么?

这是一个工作:

BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
         bd.filter(k -> { 
             
             return filterItems((List<Map<String,Object>>)rules.get("criteria"));
             
         }).writeTo(Sinks.logger());

public static boolean filterItems(List<Map<String,Object>> innerrules) {
    return true;
}

这并不奇怪:

COUNT()

我不知道为什么它不起作用,帮帮我。 如果我删除过滤器,它可以正常工作,但我需要过滤。

解决方法

我们序列化 Pipeline 并将其发送到集群执行。这意味着所有字段都必须是可序列化的。在您的情况下,您的 lambda 捕获了一些不可序列化的局部变量。可能是 rules 地图。您需要将数据复制到一些可序列化的结构,例如到 HashMap。或者像这样替换:

Criteria criteria = (List<Map<String,Object>>) rules.get("criteria");
bd.filter(k -> { 
    return filterItems(criteria);
})

如果 Criteria 是可序列化的并且 filterItems 方法是静态的,这将起作用 - 如果不是,this(包含类实例)也将被捕获。

顺便说一句,我想知道您为什么在过滤器实现中根本不使用 k,这很可能是一个错误。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...