问题描述
我对 Scala 和 Scalatest 非常陌生,但对 Pyspark 有一些经验,我正在尝试从 Spark 的角度学习 Scala。
我目前正试图弄清楚在 Scalatest 中设置和使用设备的正确方法。
我想象这种工作的方式,这可能不是在 Scala 中完成的方式,是我将设置一个 SparkSession
作为在测试套件之间共享的全局夹具,然后可能有几个示例数据集连接到 SparkSession
中,该 BeforeAndAfterall
可用于具有多个测试等的单个测试套件。
目前,我有一些代码可以使用 SparkSession
trait 的共享夹具在同一套件中运行多个测试;但是,如果我同时运行多个套件,首先完成的套件似乎会终止 java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
并且任何进一步的测试都会以 SparkSession
所以,我想知道是否有一种方法可以创建 package com.example.test
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.scalatest._
import org.scalatest.FixtureSuite
import org.scalatest.funsuite.FixtureAnyFunSuite
package testSetup {
trait SparkSetup {
val spark = SparkSession
.builder
.master("local")
.appName(getClass.getSimpleName.replace("$",""))
.getorCreate()
spark.sparkContext.setLogLevel("ERROR")
}
,以便它仅在 所有 运行套件完成后才会停止;或者,如果我在错误的树上吠叫并且完全有更好的方法 - 正如我所说,我对 Scala 很陌生,所以这可能不是你这样做的方式,在这种情况下,非常欢迎替代建议。
首先我有一个包 testSetup 并且我正在为 SparkSession 创建一个特征:
trait TestData extends SparkSetup {
def data(): DataFrame = {
val testDataStruct = StructType(List(
StructField("date",StringType,true),StructField("period",StructField("ID",IntegerType,StructField("SomeText",true)))
val testData = Seq(Row("01012020","10:00",20,"Some Text"))
spark.createDataFrame(spark.sparkContext.parallelize(testData),testDataStruct)
}
}
然后在特征中使用它来设置一些示例数据:
withFixture
然后我将它们放在一起以通过 afterall
运行测试并使用 trait DataFixture extends funsuite.FixtureAnyFunSuite with TestData with BeforeAndAfterall { this: FixtureSuite =>
type FixtureParam = DataFrame
def withFixture(test: OneArgTest) = {
super.withFixture(test.toNoArgTest(data())) // "loan" the fixture to the test
}
override def afterall() {
spark.close()
}
}
}
关闭 SparkSession;这显然是有些地方不太对劲,但我不确定是什么:
package com.example.utilities
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions._
object GeneralTransforms {
def addHashColumn(inputDataFrame: DataFrame,exclusionCols: List[String]): DataFrame = {
val columnsToHash = inputDataFrame.columns.filterNot(exclusionCols.contains(_)).toList
inputDataFrame.withColumn("RowHash",sha2(concat_ws("|",columnsToHash.map(col) : _*),256))
}
}
我目前正在测试一个基本函数来动态散列 DataFrame 中的列,并可以选择排除一些;这是代码:
import testSetup._
import com.example.utilities.GeneralTransforms._
import org.apache.spark.sql.DataFrame
class TestData extends funsuite.FixtureAnyFunSuite with DataFixture {
test("Test data has correct columns") { inputData =>
val cols = inputData.columns.toSeq
val expectedCols = Array("date","period","ID","SomeText").toSeq
assert(cols == expectedCols)
}
}
class TestAddHashColumn extends funsuite.FixtureAnyFunSuite with DataFixture {
test("Test new hash column added") { inputData =>
val hashedDf = addHashColumn(inputData,List())
val initialCols = inputData.columns.toSeq
val cols = hashedDf.columns.toSeq
assert(initialCols.contains("RowHash") == false)
assert(cols.contains("RowHash") == true)
}
test("Test all columns hashed - no exclusion") { inputData =>
val hashedDf = addHashColumn(inputData,List())
val rowHashColumn = hashedDf.select("RowHash").first().getString(0)
val checkString = "01012020|10:00|20|Some Text"
val expectedHash = String.format("%064x",new java.math.BigInteger(1,java.security.MessageDigest.getInstance("SHA-256").digest(checkString.getBytes("UTF-8"))))
assert(rowHashColumn == expectedHash)
}
test("Test all columns hashed - with exclusion") { inputData =>
val excludedColumns = List("ID","SomeText")
val hashedDf = addHashColumn(inputData,excludedColumns)
val rowHashColumn = hashedDf.select("RowHash").first().getString(0)
val checkString = "01012020|10:00"
val expectedHash = String.format("%064x",java.security.MessageDigest.getInstance("SHA-256").digest(checkString.getBytes("UTF-8"))))
assert(rowHashColumn == expectedHash)
}
}
以及当前的测试用例:-
parallelExecution in Test := false
两个测试套件单独运行时都非常好;只有当两者一起运行时,我才会遇到问题。这也可以通过将 SparkSession.builder.getorCreate
添加到我的 build.sbt 来解决,但是当我添加越来越多的测试时,如果能够允许这种情况并行发生会很好。
我还想知道是否可以通过在 BeforeAll/Afterall 中运行一些检查上下文的其他 SparkSession 实例来解决这个问题,但我不确定如何做到这一点,并想先用尽这条途径在我进入另一个兔子洞之前!
编辑
自从发布以来,我花了更多的时间在这上面,并稍微改变了一些东西,用一个帮助类来处理 Spark 设置。在此,我已将其设置为使用 SparkContext
方法创建一个伪主 spark 会话,但随后为实际测试创建了一个新的 spark 会话 - 这将允许我拥有不同的配置,并执行诸如具有不同临时表注册等。但是,我仍然无法解决 spark 关闭的问题 - 显然,如果我对 resultTasks
上的任何正在运行的会话运行 spark.stop() 它将停止所有会话的上下文。>
而且似乎在退出 sbt 之前上下文不会停止?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)