Spark Scala Job 在最终 Job 中体验长时间运行的任务

问题描述

我在 EMR 上有一个 Spark Scala 作业,它一直运行到最后一个作业为止,然后在最后一个作业中我看到 2 小时没有任何进展。所有的执行者都表明任务是均匀分布的,所以我不认为这是一个数据倾斜问题。工作正在完成,但我发现了一个错误,它覆盖了地图而不是更新,因为修复了这个错误,我看到了这种缓慢并且不知道如何修复它。这部分工作读取/写入 Redis 并使用两个 Scala 可变映射,我不确定是什么导致了这种缓慢,因此我们将不胜感激。此数据集由 col("id1") 重新分区

val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
  case class countDateMap(count: Long,dateTime: zoneddatetime)
  val id1CountDateMap = scala.collection.mutable.Map[String,scala.collection.mutable.Map[Long,Seq[countDateMap]]]()

  var rowResult: ListBuffer[Row] = ListBuffer()
  RedisClient.writeHost = writeHost.value
  RedisClient.readHost = readHost.value
  RedisClient.port = port.value
  RedisClient.keyTTL = keyTTL.value
  RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
  RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)

  iterator.foreach(row => {
    val id1 = row.getAs("id1").asInstanceOf[String]
    val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
    var rowsMap: scala.collection.mutable.Map[Long,Seq[countDateMap]] = scala.collection.mutable.Map[Long,Seq[countDateMap]]()
    if (!id1CountDateMap.isDefinedAt(id1)) {
      id1CountDateMap.put(id1,rowsMap)
    }
    rowsMap = id1CountDateMap(id1)
    
    availableRowsForId1.foreach(o => {
        val id2 = o.getAs("id2").asInstanceOf[Long]
        val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
        var restriction1 = false
        var restriction2 = false
        var localCount: Long = 1
        var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
        if (currentRowCountDateTime.isDefined) {
          //business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
        } 

        if (!restriction1 && !restriction2) {
          val set = RedisClient.setCount(id2.toString,(o.getAs("redisCount").asInstanceOf[String] + 1).toString)
          val newCountDate= countDateMap(localCount,dateTime)
          var newCountDateSeq = Seq(newCountDate)
          if (currentRowCountDateTime.nonEmpty) {
            newCountDateSeq = currentRowCountDateTime.get.union(Seq(newCountDate))
          }
          rowsMap.update(id2,newCountDateSeq)
          rowResult += RowFactory.create(id1,id2.toString)
        }
      })
    })
  })
  rowResult.toList.iterator
})(encoder)
//OLD THAT RAN QUICK BUT HAD A BUG B/C rowsMap GOT OVERWRITTEN NOT UPDATED
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
  case class countDateMap(count: Long,dateTime: zoneddatetime)
  val id1CountDateMap = new ConcurrentMap[String,Map[Long,Seq[countDateMap]]]()

  var rowResult: ListBuffer[Row] = ListBuffer()
  RedisClient.writeHost = writeHost.value
  RedisClient.readHost = readHost.value
  RedisClient.port = port.value
  RedisClient.keyTTL = keyTTL.value
  RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
  RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)

  iterator.foreach(row => {
    val id1 = row.getAs("id1").asInstanceOf[String]
    val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
    var rowsMap= scala.collection.mutable.Map[Long,Seq[countDateMap]]()
    id1CountDateMap.putifAbsent(id1,rowsMap)
    
    availableRowsForId1.foreach(o => {
        val id2 = o.getAs("id2").asInstanceOf[Long]
        val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
        var restriction1 = false
        var restriction2 = false
        var localCount: Long = 1
        var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
        if (currentRowCountDateTime.isDefined) {
          //business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
        } 

        if (!restriction1 && !restriction2) {
          val set = RedisClient.setCount(id2.toString,(o.getAs("redisCount").asInstanceOf[String] + 1).toString)
          val newCountDate = countDateMap(localCount,dateTime)
          var newCountDateSeq = Seq(newCountDate)
          if (currentRowCountDateTime.nonEmpty) {
            newCountDate Seq = currentRowCountDateTime.get.union(Seq(newCountDate ))
          }
          rowsMap.update(id2,newCountDate Seq)
          id1CountDateMap.replace(id1,rowsMap)
          rowResult += RowFactory.create(id1,id2.toString)
        }
      })
    })
  })
  rowResult.toList.iterator
})(encoder)

任务指标摘要

enter image description here

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...