问题描述
我正在尝试过滤 BatchStage 但它不起作用它给了我以下错误:
BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
bd.filter(k -> {
return true;
}).writeTo(Sinks.logger());
我的代码如下,我做错了什么?
这是一个工作:
BatchStage<Object> bd = jdbcBatchStageData.flatMap(list -> Traversers.traverseArray(list.toArray()));
bd.filter(k -> {
return filterItems((List<Map<String,Object>>)rules.get("criteria"));
}).writeTo(Sinks.logger());
public static boolean filterItems(List<Map<String,Object>> innerrules) {
return true;
}
这并不奇怪:
COUNT()
我不知道为什么它不起作用,帮帮我。 如果我删除过滤器,它可以正常工作,但我需要过滤。
解决方法
我们序列化 Pipeline
并将其发送到集群执行。这意味着所有字段都必须是可序列化的。在您的情况下,您的 lambda 捕获了一些不可序列化的局部变量。可能是 rules
地图。您需要将数据复制到一些可序列化的结构,例如到 HashMap
。或者像这样替换:
Criteria criteria = (List<Map<String,Object>>) rules.get("criteria");
bd.filter(k -> {
return filterItems(criteria);
})
如果 Criteria
是可序列化的并且 filterItems
方法是静态的,这将起作用 - 如果不是,this
(包含类实例)也将被捕获。
顺便说一句,我想知道您为什么在过滤器实现中根本不使用 k
,这很可能是一个错误。