scala – 在Spark中计算UDF的调用

使用Spark 1.6.1我想调用UDF的调用次数.我想这样做是因为我有一个非常昂贵的UDF(每次调用大约1秒),我怀疑UDF被调用次数比我数据帧中的记录数要多,这使得我的spark工作速度慢于必要.

虽然我无法重现这种情况,但我想出了一个简单的例子,显示对UDF的调用次数似乎与行数不同(此处:更少),这怎么可能?

import org.apache.spark.sql.sqlContext
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  val sqlContext = new sqlContext(sc)
  import sqlContext.implicits._


  val callCounter = sc.accumulator(0)

  val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")

  println(df.count) //  gives 10000

  val myudf = udf((d:Int) => {callCounter.add(1);d})

  val res = df.withColumn("result",myudf($"value")).cache

  println(res.select($"result").collect().size) // gives 10000
  println(callCounter.value) // gives 9941

}

如果使用累加器不是调用UDF计数的正确方法,我还能怎样做呢?

注意:在我的实际Spark-Job中,获得的呼叫计数大约是实际记录数的1.7倍.

解决方法

Spark应用程序应定义main()方法,而不是扩展scala.App. scala.App的子类可能无法正常工作.

import org.apache.spark.sql.sqlContext
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
    def main(args: Array[String]): Unit = {
         val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
         val sc = new SparkContext(conf)
         // [...]
    }   
}

这应该可以解决您的问题.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...