问题描述
有一个 Scala Flink 应用程序,我在其中使用 Jackson 库解析 JSON。解析由自定义方法处理,并使用延迟启动概念来保持快速。
现在,无论出于何种原因,在 Flink 管道中进一步传递带有惰性值的模型会导致 util.Iterator
出现一些奇怪的错误,Kryo
是读取 JSON 的主干。我怀疑该问题实际上可能来自 .toList
,但我不知道如何确认。值得注意的是,在同一个(flink)map
中急切地初始化模型(使用 case class Root(items: Collection[Data])
case class Data(data: Collection[Double])
def toRoot(node: JsonNode): Root = {
val data: util.Iterator[JsonNode] = if (node.hasNonNull("items")) node.get("items").elements() else node.elements()
val items: Collection[Data] = data.asScala.map(x => toData(x))
Root(items)
}
)修复了这个问题。但事实并非如此,我想进一步传递我的懒惰模型。
最后,我提供了一个带有演示代码的存储库,但我也想在 StackOverflow 中提供所有详细信息。
示例模型和解析定义:
{
"items": [
{
"data": [
11.71476355252127,48.342882259940176,507.3,11.714791605037252,...
JSON 数据类似于:
map
并在一个 env.fromCollection(Seq(input))
.map(i => flatten(read(i)))
.print()
中完成所有工作:
env.fromCollection(Seq(input))
.map(i => read(i))
.map(i => flatten(i))
.print()
但进一步传递失败:
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
有错误:
- Scala 2.11
Caused by: java.lang.NullPointerException
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 29 more
- Scala 2.12
Time_Of_Day
我已经创建了一个演示项目,其中包含所有准备用 Scala 2.11 和 2.12 进行测试的示例,因为它实际上给出了不同的结果 可用 HERE
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)