scala Spark NotSerializableException with FunSpec with MockFactory

问题描述

嗨,我正在尝试使用 FunSpec 和 MockFactory 编写 Scala 测试代码.. 不知何故,当我使用案例类时,我没有得到哪个类需要可序列化。

有以下例外:

  1. 导致:java.io.NotSerializableException:org.scalamock.function.StubFunction1

  2. org.apache.spark.SparkException:任务不可序列化

  3. java.lang.NullPointerException

我为 AWSUtil 类创建了存根

case class AwsUtil () 
    {
    def getobject(bucket: String,key: String) : Iterator[String] = {
    
    def s3 = new AmazonS3Client()
    println(s"from $bucket,$key")
    var is = s3.getobject(bucket,key).getobjectContent : InputStream
    
    val decoder = Charset.forName("UTF-8").newDecoder()
    decoder.onMalformedInput(CodingErrorAction.IGnorE)
    var it =scala.io.source.fromInputStream(new BufferedInputStream(is))(decoder).getLines()
   
    it
  }
} 

测试类代码

@RunWith(classOf[JUnitRunner])
class TestUploader extends FunSpec with BeforeAndAfterall with BeforeAndAfterEach with MockFactory with SparkSessionTestWrapper { 
  
    var s3Client: AmazonS3 = _
    var awsUtilStub: AwsUtil = _
    var s3MockApi = S3Mock(port = 8001,dir = "/tmp/s3")

    s3MockApi.start
      override def beforeAll(): Unit = {
        super.beforeAll()
      }

      override def afterall(): Unit = {
        super.afterall()
      }

      override def beforeEach() = {
        super.beforeEach()
      }

      override def afterEach(): Unit = {
        super.afterEach()
      }

describe("HappyPath: ") {
    it("Initializing the AWS Util object to get Google Write Data") {

      val uploader = new Uploader

      val listofFileNames = List("fileName.log")
      val it = Iterator("a","number","of","words")
      var awsUtilStub: AwsUtil = stub[AwsUtil]
      
      val outputPath = new File(baseResourcePath,"testoutput").getAbsolutePath();
      (awsUtilStub.getobject _).when("input","fileName.log").returns(it)
      
      val a = uploader.writeData(listofFileNames,"input",outputPath,spark,awsUtilStub)
      
      println("a===>" + a)
      s3MockApi.stop
    }
  }

正在测试的类:

class Uploader extends Pingsuploader with Logging with Serializable {


  def writeData(files: List[String],bucket: String,outputLocation: String,spark: SparkSession,awsUtils: AwsUtil): Long = {

    try {
      println("files " + files.size)
      println("inputDate " + inputDate)
      println("bucket " + bucket)
      println("outputLocation " + outputLocation)
      println("spark " + spark)
      //println("awsUtils " + awsUtils)
      
      //files.foreach(logInfo(_))

      val sc = spark.sparkContext
      var input = sc.parallelize(files,files.size)
      
      //input.flatMap{ key => key.split(" ")}.collect().foreach(println("printing from foreach---> ",_))
      
      var rdd = input.flatMap { key => awsUtils.getobject(bucket,key) }
      
      return rdd.count()
    } catch {
      case e: Exception =>
        logError(e.getMessage);
        throw e
    }
  }

}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)