问题描述
我正在为Spark代码编写单元测试用例,该案例可从hdfs文件和spark的目录中读取/写入数据。为此,我创建了一个单独的特征,该特征提供minidfs集群的初始化,并且在创建SparkSession对象时,将生成的hdfs uri的值用于-spark.sql.warehouse.dir
。这是它的代码-
trait TestSparkSession extends BeforeAndAfterall {
self: Suite =>
var hdfsCluster: MiniDFSCluster = _
def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"
def withLocalSparkSession(tests: SparkSession => Any): Any = {
val baseDir = new File(PathUtils.getTestDir(getClass),"miniHDFS")
val conf = new HdfsConfiguration()
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,baseDir.getAbsolutePath)
val builder = new MiniDFSCluster.Builder(conf)
hdfsCluster = builder.nameNodePort(9000)
.manageNameDfsDirs(true)
.manageDataDfsDirs(true)
.format(true)
.build()
hdfsCluster.waitClusterUp()
val testSpark = SparkSession
.builder()
.master("local")
.appName("Test App")
.config("spark.sql.warehouse.dir",s"${nameNodeURI}spark-warehouse/")
.getorCreate()
tests(testSpark)
}
def stopHdfs(): Unit = hdfsCluster.shutdown(true,true)
override def afterall(): Unit = stopHdfs()
}
在编写测试时,我会继承此特征,然后编写测试用例,例如-
class SampleSpec extends FunSuite with TestSparkSession {
withLocalSparkSession {
testSpark =>
import testSpark.implicits._
// Test 1 Here
// Test 2 Here
}
}
当我一次运行测试类时,一切都正常。但是,一次全部运行它们会抛出java.net.BindException: Address already in use
。
这应该意味着在执行下一组测试时,已经创建的hdfsCluster尚未关闭。这就是为什么它无法创建另一个绑定到同一端口的端口的原因。但是然后在afterall()中,我停止了hfdsCluster。
我的问题是我可以共享hdfs群集的单个实例并触发会话,而不是每次都初始化它吗?我试图提取方法之外的初始化,但是它仍然抛出相同的异常。即使我不能共享它,如何在下一个测试类执行时正确停止群集并重新初始化它?
另外,请让我知道我编写使用SparkSession和HDFS存储的“单元”测试用例的方法是否正确。
任何帮助将不胜感激。
解决方法
我通过在伴侣对象中创建hdfs集群来解决该问题,以便为所有测试服创建一个单独的实例。