创建依赖于其他资源的函数的Spark UDF

问题描述

我有一个用于标记字符串的代码

但是该令牌化方法使用了一些在我的应用程序启动时加载的数据。

val stopwords = getStopwords();

val tokens = tokenize("hello i am good",stopwords)

def tokenize(string:String,stopwords: List[String]) : List[String] = {
   
   val splitted = string.split(" ")

 
   // I use this stopwords for filtering my splitted array.
   // Then i return the items back.
  
}

现在我想使tokenize方法成为Spark的UDF,我想用它在DataFrame Transformations中创建新列。

我创建了简单的UDF,在此之前没有依赖性,例如需要从文本文件等中读取项目。

有人可以告诉我如何进行这种操作吗?

这是我尝试过的方法,并且可以正常工作。

      val moviesDF = Seq(
        ("kingdomofheaven"),("enemyatthegates"),("salesinfointheyearofdecember"),).toDF("column_name")
    
      val tokenizeUDF: UserDefinedFunction = udf(tokenize(_: String): List[String])
    
      moviesDF.withColumn("tokenized",tokenizeUDF(col("column_name"))).show(100,false)
    
    
      def tokenize(name: String): List[String] = {
    
        val wordFreqMap: Map[String,Double] = DataProviderUtil.getWordFreqMap()
        val stopWords: Set[String] = DataProviderUtil.getStopWordSet()
        val maxLengthWord: Int = wordFreqMap.keys.maxBy(_.length).length
    
        .................
        .................
      }

它给了我预期的输出

+----------------------------+--------------------------+
|columnname                  |tokenized                 |
+----------------------------+--------------------------+
|kingdomofheaven             |[kingdom,heaven]         |
|enemyatthegates             |[enemi,gate]             |
|salesinfointheyearofdecember|[sale,info,year,decemb]|
+----------------------------+--------------------------+

现在我的问题是,它部署后是否可以工作?目前我是 在本地运行。我主要担心的是,此函数文件获取停用词,wordfreq等信息,以使 标记化是可能的。所以像这样注册它可以正常工作 ?

解决方法

此时,如果部署此代码,Spark将尝试序列化DataProviderUtil,则需要将该类标记为可序列化。另一种可能性是在对象内部声明逻辑。对象内部的函数被视为静态函数,并且不会序列化。