如何处理Spark中找不到的密钥错误?

问题描述

我正在与pregel合作,以实现一个迭代算法。该代码在语法上有效,但是在执行时会引发错误Key not found: 3。 3是节点号。我真的对代码感到困惑,我真的不认为我该如何处理。实际上,我认为错误可能与计算label_score中的vertexProgram有关,但我找不到任何解决方法

如果有人帮助我提供此代码,那将非常有帮助。

这是执行pregel代码

def run2[VD,ED: classtag](graph: Graph[VD,ED],maxSteps: Int) = {

  val temp_graph = graph.mapVertices { case (vid,_) => mutable.HashMap[VertexId,(Double,VertexId)](vid -> (1,vid)) }

  def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId,VertexId)],ED]): Iterator[(VertexId,mutable.HashMap[VertexId,List[(Double,VertexId)]])] = {
    val msg1 = e.dstAttr.map{ case (k,v) => (k,List(v)) }
    val msg2 = e.srcAttr.map{ case (k,List(v)) }
    Iterator((e.srcId,msg1),(e.dstId,msg2))
  }

  def mergeMessage(count1: (mutable.HashMap[VertexId,VertexId)]]),count2: (mutable.HashMap[VertexId,VertexId)]]))= {

    val communityMap = new mutable.HashMap[VertexId,VertexId)]]

    (count1.keySet ++ count2.keySet).map(key => {

      val count1Val: List[(Double,VertexId)] = count1.getorElse(key,Nil)
      val count2Val: List[(Double,VertexId)] = count2.getorElse(key,Nil)

      val pp = List(count1Val:::count2Val).flatten
        communityMap += key->(pp)
      })
    communityMap
  }

  def vertexProgram(vid: VertexId,attr: mutable.HashMap[VertexId,message: mutable.HashMap[VertexId,VertexId)]]) = {

    if (message.isEmpty) attr
    else {

      val labels_score: mutable.HashMap[VertexId,Double] = message.map { key =>

        var value_sum = 0D
        var maxSimilar_result = 0D

        val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
          case Some(x) => x._2 // most similar neighbor
          case _ => -1
        }

        maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
          case Some(v) => v._1 // is the most similar node in the List?
          case _ => 0D
        }

        key._2.map { values =>
          value_sum += values._1 * (broadcastvariable.value(vid)(values._2)._2)
        }
        value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)

        (key._1,value_sum) //label list
      }


      val max_value = labels_score.maxBy(x=>x._2)._2.todouble
      val dividedByMax: mutable.HashMap[VertexId,Double] = labels_score.map(x=>(x._1,x._2/max_value)) // divide by maximum value

      val resultMap: mutable.HashMap[VertexId,Double] = new mutable.HashMap[VertexId,Double]

      dividedByMax.foreach{ row => // select labels more than threshold P = 0.75
        if (row._2 >= p) resultMap += row
      }

      val max_for_normalize= resultMap.values.sum

      val res = resultMap.map(x=>(x._1->(x._2/max_for_normalize,x._1))) // normalize labels

      res
    }
  }

  val initialMessage = mutable.HashMap[VertexId,VertexId)]]()

  val overlapCommunitiesGraph = pregel(temp_graph,initialMessage,maxIterations = maxSteps)(
    vprog = vertexProgram,sendMsg = sendMessage,mergeMsg = mergeMessage)

  overlapCommunitiesGraph
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)