Spark Streaming数据集Cassandra连接不支持

问题描述

我正在尝试将流数据集写入Cassandra。

我有以下课程的流数据集;

case class UserSession(var id: Int,var visited: List[String]
                      )

我在Cassandra中也有以下键空间/表。 (博客= KeySpace,会话=表

CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor' : 1 };


CREATE TABLE blog.session(id int PRIMARY KEY,visited list<text>);

我选择list<text>作为访问者,因为访问者的类型为List<String>

我的foreach作家如下

class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {

/*
  - on every batch,on every partition `partitionId`
    - on every "epoch" = chunk of data
      - call the open method; if false,skip this chunk
      - for each entry in this chunk,call the process method
      - call the close method either at the end of the chunk or with an error if it was thrown
 */

val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)

override def open(partitionId: Long,epochId: Long): Boolean = {
  println("Open connection")
  true
}

override def process(sess: UserSession): Unit = {
  connector.withSessionDo { session =>
    session.execute(
      s"""
         |insert into $keyspace.$table("id")
         |values (${sess.id},${sess.visited})
       """.stripMargin)
  }
}

override def close(errorOrNull: Throwable): Unit = println("Closing connection")

 }

查看我的过程函数可能会有所帮助,因为这可能会引发错误。我主要是以下内容。

finishedUserSessionsStream:数据集[UserSession]

def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....

finishedUserSessionsStream.writeStream
      .option("checkpointLocation","checkpoint")
      .foreach(new SessionCassandraForeachWriter)
      .start()
      .awaitTermination()

}

这给了我以下错误

org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)

解决方法

对于Spark 3.0和Spark Cassandra Connector 3.0.0,您不应使用foreach-这是SCC Starting with SCC 2.5.0,您可以像这样直接将数据写入Cassandra(此处为full example):

     val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .format("org.apache.spark.sql.cassandra")
      .option("checkpointLocation","checkpoint")
      .option("keyspace","ks")
      .option("table","table")
      .start()

您还需要切换为使用包含很多修复程序的SCC 3.0.0-beta。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...