如何从Spark将数据放入Ignite缓存

问题描述

我管理了几个spark作业来计算RDD,最后,我想将其中一些数据放入Ignite Cache中。不幸的是,我遇到了一个错误

java.lang.classCastException: org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl cannot be cast to org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy
[info]  at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.equals(GatewayProtectedCacheProxy.java:1715)
[info]  at scala.collection.mutable.FlatHashTable$class.findElemImpl(FlatHashTable.scala:131)
[info]  at scala.collection.mutable.FlatHashTable$class.containsElem(FlatHashTable.scala:124)
[info]  at scala.collection.mutable.HashSet.containsElem(HashSet.scala:40)
[info]  at scala.collection.mutable.HashSet.contains(HashSet.scala:57)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:87)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitExternalizable(SerializationDebugger.scala:142)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:104)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
[info]  at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67)
[info]  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
[info]  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
[info]  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
[info]  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
[info]  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
[info]  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[info]  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
[info]  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
[info]  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
[info]  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
[info]  at org.apache.spark.rdd.RDD.map(RDD.scala:370).........

所以我的问题是如何将来自spark rdd的数据放入特定的Ignite缓存中(在我们的示例中是在Postgres上实现的具有第3个持久性缓存存储的Ignite缓存)?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)