scala – 任务不可序列化Flink

我试图在flink中进行pagerank基本示例,稍加修改(仅在读取输入文件时,其他一切都是相同的)我得到错误,因为任务不可序列化,下面是输出错误的一部分

atorg.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171)

以下是我的代码

object hpdb {

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val maxIterations = 10000

    val damPENING_FACTOR: Double = 0.85

    val EPSILON: Double = 0.0001

    val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"

    val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",fieldDelimiter = "\t",includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target

    val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid

    val noOfPages = pages.count()

    val pagesWithRanks = pages.map(p => Page(p.pageId,1.0 / noOfPages))

    val adjacencyLists = links
      // initialize lists ._1 is the source id and ._2 is the traget id
      .map(e => AdjacencyList(e.sourceId,Array(e.targetId)))
      // concatenate lists
      .groupBy("sourceId").reduce {
      (l1,l2) => AdjacencyList(l1.sourceId,l1.targetIds ++ l2.targetIds)
    }

    // start iteration

    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
     // **//the output shows error here**     
     currentRanks =>
        val newRanks = currentRanks
          // distribute ranks to target pages
          .join(adjacencyLists).where("pageId").equalTo("sourceId") {
          (page,adjacent,out: Collector[Page]) =>
            for (targetId <- adjacent.targetIds) {
              out.collect(Page(targetId,page.rank / adjacent.targetIds.length))
            }
        }

          // collect ranks and sum them up

          .groupBy("pageId").aggregate(SUM,"rank")
          // apply dampening factor
         //**//the output shows error here** 
           .map { p =>
          Page(p.pageId,(p.rank * damPENING_FACTOR) + ((1 - damPENING_FACTOR) / pages.count()))
        }

        // terminate if no rank update was significant
        val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
          (current,next,out: Collector[Int]) =>
            // check for significant update
            if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
        }

        (newRanks,termination)
    }

    val result = finalRanks

    // emit result
    result.writeAsCsv(outpath,"\n"," ")

    env.execute()

    }
}

任何正确方向的帮助都受到高度赞赏?谢谢.

解决方法

问题是您从MapFunction中引用DataSet页面.这是不可能的,因为DataSet只是数据流的逻辑表示,不能在运行时访问.

解决此问题,您需要做的是将val pagesCount = pages.count值赋给变量pagesCount并在MapFunction中引用此变量.

pages.count实际上做的是触发数据流图的执行,以便可以计算页中元素的数量.然后结果返回到您的程序.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...