Spark:无法将GenericRowWithSchema强制转换为GenTraversableOnce

问题描述

这是我的代码

    val rdd = session.sparkContext
      .parallelize(offsets,offsets.size)
      .flatMap[Row](offset => {
        val query  = s"${config.exec} SKIP ${offset.start} LIMIT ${offset.size}"
        val result = new Neo4jSessionAwareIterator(neo4jConfig,query,Maps.newHashMap(),false)
        val fields = if (result.hasNext) result.peek().keys().asScala else List()
        val schema =
          if (result.hasNext)
            StructType(
              fields
                .map(k => (k,result.peek().get(k).`type`()))
                .map(keyType => CypherTypes.field(keyType)))
          else new StructType()
        result.map(record => {
          val row = new Array[Any](record.keys().size())
          for (i <- row.indices)
            row.update(i,Executor.convert(record.get(i).asObject()))
          new GenericRowWithSchema(values = row,schema).asInstanceOf[Row]
        })
      })

    if (rdd.isEmpty())
      throw new RuntimeException()
    val schema = rdd.repartition(1).first().schema
    session.createDataFrame(rdd,schema)

当我使用spark-submit在服务器上运行此命令,或者我的计算机使用ide与spark-2.4.6一起运行时,是正确的。

但是当我在计算机中使用spark-submit或在服务器中使用spark-3.0时,强制转换异常将抛出rdd.isEmpty()

这是我的例外消息(部分):

java.lang.classCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.collection.GenTraversableOnce
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
        at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at scala.collection.AbstractIterator.to(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1409)

Driver stacktrace:
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
        at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1517)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1517)
        at com.xx.xx.tools.importer.reader.Neo4JReader.read(ServerBaseReader.scala:146)

感谢您的关注!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)