用于多个测试类的MiniDFS群集设置会抛出java.net.BindException:地址已在使用中

问题描述

我正在为Spark代码编写单元测试用例,该案例可从hdfs文件和spark的目录中读取/写入数据。为此,我创建了一个单独的特征,该特征提供minidfs集群的初始化,并且在创建SparkSession对象时,将生成的hdfs uri的值用于-spark.sql.warehouse.dir。这是它的代码-

trait TestSparkSession extends BeforeAndAfterall {
  self: Suite =>

  var hdfsCluster: MiniDFSCluster = _

  def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"

  def withLocalSparkSession(tests: SparkSession => Any): Any = {
    val baseDir = new File(PathUtils.getTestDir(getClass),"miniHDFS")
    val conf = new HdfsConfiguration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,baseDir.getAbsolutePath)
    val builder = new MiniDFSCluster.Builder(conf)
    hdfsCluster = builder.nameNodePort(9000)
      .manageNameDfsDirs(true)
      .manageDataDfsDirs(true)
      .format(true)
      .build()
    hdfsCluster.waitClusterUp()

    val testSpark = SparkSession
      .builder()
      .master("local")
      .appName("Test App")
      .config("spark.sql.warehouse.dir",s"${nameNodeURI}spark-warehouse/")
      .getorCreate()
    tests(testSpark)  
  }

  def stopHdfs(): Unit = hdfsCluster.shutdown(true,true)

  override def afterall(): Unit = stopHdfs()

}

在编写测试时,我会继承此特征,然后编写测试用例,例如-

class SampleSpec extends FunSuite with TestSparkSession {
     withLocalSparkSession {
         testSpark =>
         import testSpark.implicits._

         // Test 1 Here
         // Test 2 Here
     }
}

当我一次运行测试类时,一切都正常。但是,一次全部运行它们会抛出java.net.BindException: Address already in use。 这应该意味着在执行下一组测试时,已经创建的hdfsCluster尚未关闭。这就是为什么它无法创建另一个绑定到同一端口的端口的原因。但是然后在afterall()中,我停止了hfdsCluster。

我的问题是我可以共享hdfs群集的单个实例并触发会话,而不是每次都初始化它吗?我试图提取方法之外的初始化,但是它仍然抛出相同的异常。即使我不能共享它,如何在下一个测试类执行时正确停止群集并重新初始化它?

另外,请让我知道我编写使用SparkSession和HDFS存储的“单元”测试用例的方法是否正确。

任何帮助将不胜感激。

解决方法

我通过在伴侣对象中创建hdfs集群来解决该问题,以便为所有测试服创建一个单独的实例。