问题描述
我正在尝试将数据加载到 jet pipeLine 阶段内的 IMap 中,但出现错误
这是我的代码
public static Pipeline pipeLineStage(JetInstance jet) {
Pipeline pipeLine = Pipeline.create();
BatchStage<DataModel> dbValue = pipeLine.readFrom(Sources.jdbc(
"jdbc:postgresql://localhost/postgres?user=postgres&password=root","SELECT id1,id2,id3,id4\r\n"
+ " FROM public.tbl_test where id1='3'",resultSet -> new DataModel(resultSet.getString(2),resultSet.getString(3),resultSet.getString(4))));
dbValue.filter(model -> model.getId2().equals("person"))
.map(model -> JsonUtil.mapFrom(model.getObject_value())).map(map -> {
IMap<Object,Object> map1 = jet.getMap("map1");
map1.put("employee_id",map.get("id"));
return map;
}).writeTo(Sinks.logger());
return pipeLine;
}
错误:-
Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:203)
*如果我将数据存储在普通的 Map
中,我不会收到任何错误,只有当我存储在 IMap
对象中时才会收到错误,并且在上面的代码中我使用的是模型类 i,e DataModel
并实现 public class DataModel implements Serializable {}
..... 任何建议也会有所帮助.. 谢谢 *
解决方法
看来你已经配置了一个名为mapFn的需要序列化的序列化工厂。只需在类定义中添加可序列化的实现即可。