问题描述
我正在尝试将流数据集写入Cassandra。
我有以下课程的流数据集;
case class UserSession(var id: Int,var visited: List[String]
)
我在Cassandra中也有以下键空间/表。 (博客= KeySpace,会话=表
CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor' : 1 };
CREATE TABLE blog.session(id int PRIMARY KEY,visited list<text>);
我选择list<text>
作为访问者,因为访问者的类型为List<String>
我的foreach作家如下
class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {
/*
- on every batch,on every partition `partitionId`
- on every "epoch" = chunk of data
- call the open method; if false,skip this chunk
- for each entry in this chunk,call the process method
- call the close method either at the end of the chunk or with an error if it was thrown
*/
val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)
override def open(partitionId: Long,epochId: Long): Boolean = {
println("Open connection")
true
}
override def process(sess: UserSession): Unit = {
connector.withSessionDo { session =>
session.execute(
s"""
|insert into $keyspace.$table("id")
|values (${sess.id},${sess.visited})
""".stripMargin)
}
}
override def close(errorOrNull: Throwable): Unit = println("Closing connection")
}
查看我的过程函数可能会有所帮助,因为这可能会引发错误。我主要是以下内容。
finishedUserSessionsStream:数据集[UserSession]
def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....
finishedUserSessionsStream.writeStream
.option("checkpointLocation","checkpoint")
.foreach(new SessionCassandraForeachWriter)
.start()
.awaitTermination()
}
这给了我以下错误
在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)
解决方法
对于Spark 3.0和Spark Cassandra Connector 3.0.0,您不应使用foreach
-这是SCC Starting with SCC 2.5.0,您可以像这样直接将数据写入Cassandra(此处为full example):
val query = streamingCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation","checkpoint")
.option("keyspace","ks")
.option("table","table")
.start()
您还需要切换为使用包含很多修复程序的SCC 3.0.0-beta。