问题描述
我正在使用 spark 和 cassandra 在 scala 中运行测试,为了测试我使用 testcontainers 并且由于某些工作原因我们没有使用 testcontainers 的 scala 变体,testcontainers 的问题端口是随机分配的,我没有知道使用什么参数来获取端口,以便我可以连接
//code block
@RunWith(classOf[JUnitRunner])
class ConnectorSpec extends AnyFlatSpec
with BeforeAndAfterall{
val container = new CassandraContainer("cassandra:latest")
container.withExposedPorts(9042)
container.waitingFor(Wait.forListeningPort())
container.start()
val ip = container.getContainerIpAddress() //output localhost
val port = container.getMappedPort(9042)
val cluster = container.getCluster()
val session = cluster.connect()
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class':'SimpleStrategy','replication_factor':'1'};")
session.execute("CREATE TABLE IF NOT EXISTS test.eureka (id int PRIMARY KEY,city varchar,role varchar);")
session.execute("INSERT INTO test.tester (id,city,role) VALUES (1,'Jakarta','Devops')")
session.execute("SELECT * FROM test.tester")
assert(container.isRunning)
val spark = SparkSession
.builder()
.appName("ReadCassandra")
.master("local[*]")
.getorCreate()
spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000))
spark.setCassandraConf(cluster.getClusterName(),CassandraConnectorConf.ConnectionHostParam.option("127.0.0.1"))
val df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "tester","keyspace" -> "test"))
.load()
df.show()
#sbt dependency
libraryDependencies += "org.testcontainers" % "cassandra" % "1.15.3" % Test
Testing started at 5:30 PM ...
Connected to the target VM,address: '127.0.0.1:53611',transport: 'socket'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/kenneth/.cache/coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/kenneth/.cache/coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-nop/1.7.30/slf4j-nop-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
解决方法
您的代码中已经有本地端口:
val port = container.getMappedPort(9042)
TestContainers 专门为此目的提供 getMappedPort
:它将为您提供映射到容器中指定为参数的端口的本地端口(在您的情况下,本地端口映射到 Cassandra 容器中的 9042)。
注意:您必须在调用 getMappedPort
之前启动容器,但这已经是您的情况了。