问题描述
我在 EMR 上有一个 Spark Scala 作业,它一直运行到最后一个作业为止,然后在最后一个作业中我看到 2 小时没有任何进展。所有的执行者都表明任务是均匀分布的,所以我不认为这是一个数据倾斜问题。工作正在完成,但我发现了一个错误,它覆盖了地图而不是更新,因为修复了这个错误,我看到了这种缓慢并且不知道如何修复它。这部分工作读取/写入 Redis 并使用两个 Scala 可变映射,我不确定是什么导致了这种缓慢,因此我们将不胜感激。此数据集由 col("id1") 重新分区
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
case class countDateMap(count: Long,dateTime: zoneddatetime)
val id1CountDateMap = scala.collection.mutable.Map[String,scala.collection.mutable.Map[Long,Seq[countDateMap]]]()
var rowResult: ListBuffer[Row] = ListBuffer()
RedisClient.writeHost = writeHost.value
RedisClient.readHost = readHost.value
RedisClient.port = port.value
RedisClient.keyTTL = keyTTL.value
RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)
iterator.foreach(row => {
val id1 = row.getAs("id1").asInstanceOf[String]
val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
var rowsMap: scala.collection.mutable.Map[Long,Seq[countDateMap]] = scala.collection.mutable.Map[Long,Seq[countDateMap]]()
if (!id1CountDateMap.isDefinedAt(id1)) {
id1CountDateMap.put(id1,rowsMap)
}
rowsMap = id1CountDateMap(id1)
availableRowsForId1.foreach(o => {
val id2 = o.getAs("id2").asInstanceOf[Long]
val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
var restriction1 = false
var restriction2 = false
var localCount: Long = 1
var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
if (currentRowCountDateTime.isDefined) {
//business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
}
if (!restriction1 && !restriction2) {
val set = RedisClient.setCount(id2.toString,(o.getAs("redisCount").asInstanceOf[String] + 1).toString)
val newCountDate= countDateMap(localCount,dateTime)
var newCountDateSeq = Seq(newCountDate)
if (currentRowCountDateTime.nonEmpty) {
newCountDateSeq = currentRowCountDateTime.get.union(Seq(newCountDate))
}
rowsMap.update(id2,newCountDateSeq)
rowResult += RowFactory.create(id1,id2.toString)
}
})
})
})
rowResult.toList.iterator
})(encoder)
//OLD THAT RAN QUICK BUT HAD A BUG B/C rowsMap GOT OVERWRITTEN NOT UPDATED
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
case class countDateMap(count: Long,dateTime: zoneddatetime)
val id1CountDateMap = new ConcurrentMap[String,Map[Long,Seq[countDateMap]]]()
var rowResult: ListBuffer[Row] = ListBuffer()
RedisClient.writeHost = writeHost.value
RedisClient.readHost = readHost.value
RedisClient.port = port.value
RedisClient.keyTTL = keyTTL.value
RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)
iterator.foreach(row => {
val id1 = row.getAs("id1").asInstanceOf[String]
val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
var rowsMap= scala.collection.mutable.Map[Long,Seq[countDateMap]]()
id1CountDateMap.putifAbsent(id1,rowsMap)
availableRowsForId1.foreach(o => {
val id2 = o.getAs("id2").asInstanceOf[Long]
val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
var restriction1 = false
var restriction2 = false
var localCount: Long = 1
var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
if (currentRowCountDateTime.isDefined) {
//business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
}
if (!restriction1 && !restriction2) {
val set = RedisClient.setCount(id2.toString,(o.getAs("redisCount").asInstanceOf[String] + 1).toString)
val newCountDate = countDateMap(localCount,dateTime)
var newCountDateSeq = Seq(newCountDate)
if (currentRowCountDateTime.nonEmpty) {
newCountDate Seq = currentRowCountDateTime.get.union(Seq(newCountDate ))
}
rowsMap.update(id2,newCountDate Seq)
id1CountDateMap.replace(id1,rowsMap)
rowResult += RowFactory.create(id1,id2.toString)
}
})
})
})
rowResult.toList.iterator
})(encoder)
任务指标摘要
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)