如何在测试套件之间持续存在的 scalatest 中创建共享 SparkSession 夹具?

问题描述

我对 Scala 和 Scalatest 非常陌生,但对 Pyspark 有一些经验,我正在尝试从 Spark 的角度学习 Scala。

我目前正试图弄清楚在 Scalatest 中设置和使用设备的正确方法

我想象这种工作的方式,这可能不是在 Scala 中完成的方式,是我将设置一个 SparkSession 作为在测试套件之间共享的全局夹具,然后可能有几个示例数据集连接到 SparkSession 中,该 BeforeAndAfterall 可用于具有多个测试等的单个测试套件。

目前,我有一些代码可以使用 SparkSession trait 的共享夹具在同一套件中运行多个测试;但是,如果我同时运行多个套件,首先完成的套件似乎会终止 java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. 并且任何进一步的测试都会以 SparkSession

失败

所以,我想知道是否有一种方法可以创建 package com.example.test import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.scalatest._ import org.scalatest.FixtureSuite import org.scalatest.funsuite.FixtureAnyFunSuite package testSetup { trait SparkSetup { val spark = SparkSession .builder .master("local") .appName(getClass.getSimpleName.replace("$","")) .getorCreate() spark.sparkContext.setLogLevel("ERROR") } ,以便它仅在 所有 运行套件完成后才会停止;或者,如果我在错误的树上吠叫并且完全有更好的方法 - 正如我所说,我对 Scala 很陌生,所以这可能不是你这样做的方式,在这种情况下,非常欢迎替代建议。

首先我有一个包 testSetup 并且我正在为 SparkSession 创建一个特征:

 trait TestData extends SparkSetup {

    def data(): DataFrame = {

      val testDataStruct = StructType(List(
                              StructField("date",StringType,true),StructField("period",StructField("ID",IntegerType,StructField("SomeText",true)))

      val testData = Seq(Row("01012020","10:00",20,"Some Text"))

      spark.createDataFrame(spark.sparkContext.parallelize(testData),testDataStruct)
      
    }
  }

然后在特征中使用它来设置一些示例数据:

withFixture

然后我将它们放在一起以通过 afterall 运行测试并使用 trait DataFixture extends funsuite.FixtureAnyFunSuite with TestData with BeforeAndAfterall { this: FixtureSuite => type FixtureParam = DataFrame def withFixture(test: OneArgTest) = { super.withFixture(test.toNoArgTest(data())) // "loan" the fixture to the test } override def afterall() { spark.close() } } } 关闭 SparkSession;这显然是有些地方不太对劲,但我不确定是什么:

package com.example.utilities

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions._

object GeneralTransforms {
    def addHashColumn(inputDataFrame: DataFrame,exclusionCols: List[String]): DataFrame = {
        
        val columnsToHash = inputDataFrame.columns.filterNot(exclusionCols.contains(_)).toList
        
        inputDataFrame.withColumn("RowHash",sha2(concat_ws("|",columnsToHash.map(col) : _*),256))
    }
}

我目前正在测试一个基本函数来动态散列 DataFrame 中的列,并可以选择排除一些;这是代码

import testSetup._
import com.example.utilities.GeneralTransforms._

import org.apache.spark.sql.DataFrame

class TestData extends funsuite.FixtureAnyFunSuite with DataFixture {
  test("Test data has correct columns") { inputData => 
    val cols = inputData.columns.toSeq
    val expectedCols = Array("date","period","ID","SomeText").toSeq
    
    assert(cols == expectedCols)
  }
}

class TestAddHashColumn extends funsuite.FixtureAnyFunSuite with DataFixture {
  
  test("Test new hash column added") { inputData =>
    val hashedDf = addHashColumn(inputData,List())
    val initialCols = inputData.columns.toSeq
    val cols = hashedDf.columns.toSeq
    
    assert(initialCols.contains("RowHash") == false)
    assert(cols.contains("RowHash") == true)
  }

  test("Test all columns hashed - no exclusion") { inputData =>
    val hashedDf = addHashColumn(inputData,List())
    val rowHashColumn = hashedDf.select("RowHash").first().getString(0)
    val checkString = "01012020|10:00|20|Some Text"
    val expectedHash = String.format("%064x",new java.math.BigInteger(1,java.security.MessageDigest.getInstance("SHA-256").digest(checkString.getBytes("UTF-8"))))

    assert(rowHashColumn == expectedHash)
  }

  test("Test all columns hashed - with exclusion") { inputData =>

    val excludedColumns = List("ID","SomeText")
    val hashedDf = addHashColumn(inputData,excludedColumns)
    val rowHashColumn = hashedDf.select("RowHash").first().getString(0)
    val checkString = "01012020|10:00"
    val expectedHash = String.format("%064x",java.security.MessageDigest.getInstance("SHA-256").digest(checkString.getBytes("UTF-8"))))

    assert(rowHashColumn == expectedHash)

  }
}

以及当前的测试用例:-

parallelExecution in Test := false

两个测试套件单独运行时都非常好;只有当两者一起运行时,我才会遇到问题。这也可以通过将 SparkSession.builder.getorCreate 添加到我的 build.sbt 来解决,但是当我添加越来越多的测试时,如果能够允许这种情况并行发生会很好。

我还想知道是否可以通过在 BeforeAll/Afterall 中运行一些检查上下文的其他 SparkSession 实例来解决这个问题,但我不确定如何做到这一点,并想先用尽这条途径在我进入另一个兔子洞之前!

编辑

自从发布以来,我花了更多的时间在这上面,并稍微改变了一些东西,用一个帮助类来处理 Spark 设置。在此,我已将其设置为使用 SparkContext 方法创建一个伪主 spark 会话,但随后为实际测试创建了一个新的 spark 会话 - 这将允许我拥有不同的配置,并执行诸如具有不同临时表注册等。但是,我仍然无法解决 spark 关闭的问题 - 显然,如果我对 resultTasks 上的任何正在运行的会话运行 spark.stop() 它将停止所有会话的上下文。>

而且似乎在退出 sbt 之前上下文不会停止?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)