无法使用 Spotify docker 客户端启动 docker 容器

问题描述

我正在编写集成测试用例以从 Amazon SQS 发送和接收消息。为此,我从图像 s12v/elasticmq:0.13.2 运行 docker 容器。为了启动容器,我使用了 Spotify docker 客户端。

依赖:-

    "com.whisk" %% "docker-testkit-scalatest" % "0.9.9" % "test","com.whisk" %% "docker-testkit-impl-spotify" % "0.9.9" % "test"

DockerSqsService 特征代码看起来像

import com.spotify.docker.client.{DefaultDockerClient,DockerClient}
import com.whisk.docker.DockerReadyChecker.LogLineContains
import com.whisk.docker._
import com.whisk.docker.impl.spotify.SpotifyDockerFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterall,Suite}

import scala.concurrent.duration._

trait DockerSqsService
  extends Suite
    with DockerKit
    with BeforeAndAfterall
    with ScalaFutures
    with PatienceConfig {

  val defaultSqsPort = 9324
  val sqsTopicArn = "sqs-arn"
  val awsAccessKey = "access-key"
  val awsSecretKey = "secret-key"

  val sqsContainer: DockerContainer = DockerContainer("s12v/elasticmq:0.13.2")
    .withPorts(defaultSqsPort -> None)
    .withReadyChecker(
      LogLineContains("SQS server has started")
        .looped(15,1.second)
    )

  val sqsPort: Int = {
    sqsContainer
      .getPorts()
      .map(_.apply(defaultSqsPort))
      .futureValue
  }
  val sqsEndpoint = s"http://localhost:$sqsPort"

  private val client: DockerClient = DefaultDockerClient.fromEnv().build()

  override implicit val dockerFactory: DockerFactory = new SpotifyDockerFactory(client)

  override lazy val containerManager = new DockerContainerManager(dockerContainers,dockerFactory.createExecutor())

  abstract override def dockerContainers: List[DockerContainer] =
    sqsContainer :: super.dockerContainers

  override def beforeAll(): Unit = {
    super.beforeAll()
  }
}

然后将此特征与包含 AWS 测试和设置的所有配置的基本特征混合在一起。 BaseSetup 特征是

import java.net.URI

import akka.actor.ActorSystem
import com.amazonaws.auth.{AWsstaticCredentialsProvider,BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.{AmazonSQSAsync,AmazonSQSAsyncclientBuilder}
import com.github.matsluni.akkahttpspi.AkkaHttpClient
import fileuploader.gateway.{AWSUtils,SqsClient}
import org.mockito.Mockito.verifyNoMoreInteractions
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterEach,Inside}
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials,StaticCredentialsProvider}
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncclient
import software.amazon.awssdk.utils.AttributeMap

trait BaseSetup extends AnyFlatSpec
  with MockFactory
  with Matchers
  with Inside
  with Eventually
  with BeforeAndAfterEach
  with DockerSqsService {

  private val logger = LoggerFactory.getLogger(getClass)
  implicit val actorSystem: ActorSystem = ActorSystem("sqs-client-actor-system")
  val httpClient: SdkAsyncHttpClient = AkkaHttpClient
    .builder()
    .buildWithDefaults(AttributeMap.empty())
  val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(
    AwsBasicCredentials.create(awsAccessKey,awsSecretKey)
  )
  implicit val asyncSqsClient: SqsAsyncclient =
    SqsAsyncclient
      .builder()
      .credentialsProvider(credentialsProvider)
      .region(Region.EU_CENTRAL_1)
      .httpClient(httpClient)
      .overrideConfiguration(AWSUtils.overrideConfig)
      .build()

  val sqsClient = new SqsClient(sqsTopicArn)

  // Client to create queue in SQS
  val sqsAsyncclient: AmazonSQSAsync = {
    val sqsClientBuilder: AmazonSQSAsyncclientBuilder = AmazonSQSAsyncclientBuilder.standard()
    val awsstaticCredentialsProvider = new AWsstaticCredentialsProvider(
      new BasicAWSCredentials(awsAccessKey,awsSecretKey)
    )
    sqsClientBuilder.withCredentials(
      awsstaticCredentialsProvider
    )

    sqsClientBuilder.withEndpointConfiguration(
      new EndpointConfiguration(sqsEndpoint,Region.EU_CENTRAL_1.toString)
    )
    sqsClientBuilder.build()
  }

  override def beforeAll(): Unit = {
    initialize()
  }

  override def afterEach(): Unit = {
    verifyNoMoreInteractions(sqsClient)
  }

  override def afterall(): Unit = {
    actorSystem.terminate()
  }

  protected[fileuploader] def createSqsQueue(sqsAsync: AmazonSQSAsync,queueName: String): String = {
    val url = new URI(sqsAsync.createQueue(queueName).getQueueUrl)
    val sqsEndpointUrl = s"$sqsEndpoint${url.getPath}"
    logger.info(sqsEndpointUrl)
    sqsEndpointUrl
  }

  protected[fileuploader] def initialize(): Unit = {
    createSqsQueue(sqsAsyncclient,"file_uploader")
  }
}

最终,这个 BaseSetup 特性与我的规范文件混淆,即

class SqsClientBase extends BaseSetup {

  "SqsClient" should "restart if it fails to connect to SQS" in {
    //given
    //when
    //then
  }
}

所以,我现在尝试运行这个规范,在任何测试用例运行之前,我收到一个错误

An exception or error caused a run to abort. 
java.lang.NullPointerException
    at fileuploader.util.DockerSqsService.containerManager(DockerSqsService.scala:43)
    at fileuploader.util.DockerSqsService.containerManager$(DockerSqsService.scala:43)
    at fileuploader.gateway.SqsClientBase.containerManager$lzycompute(SqsClientBase.scala:6)
    at fileuploader.gateway.SqsClientBase.containerManager(SqsClientBase.scala:6)
    at com.whisk.docker.DockerKit.getContainerState(DockerKit.scala:36)
    at com.whisk.docker.DockerKit.getContainerState$(DockerKit.scala:35)
    at fileuploader.gateway.SqsClientBase.getContainerState(SqsClientBase.scala:6)
    at com.whisk.docker.DockerKit.containerToState(DockerKit.scala:40)
    at com.whisk.docker.DockerKit.containerToState$(DockerKit.scala:39)
    at fileuploader.gateway.SqsClientBase.containerToState(SqsClientBase.scala:6)
    at fileuploader.util.DockerSqsService.$init$(DockerSqsService.scala:32)
    at fileuploader.gateway.SqsClientBase.<init>(SqsClientBase.scala:6)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.class.newInstance(Class.java:442)
    at org.scalatest.tools.Runner$.genSuiteConfig(Runner.scala:1402)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$8(Runner.scala:1199)
    at scala.collection.immutable.List.map(List.scala:246)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1198)
    at org.scalatest.tools.Runner$.$anonfun$runoptionallyWithPassFailReporter$24(Runner.scala:993)
    at org.scalatest.tools.Runner$.$anonfun$runoptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
    at org.scalatest.tools.Runner$.withClassLoaderAnddispatchReporter(Runner.scala:1480)
    at org.scalatest.tools.Runner$.runoptionallyWithPassFailReporter(Runner.scala:971)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)

我完全不知道如何解决这个问题。这里的任何指示都会非常有帮助。 TIA

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)