问题描述
df.writeStream
.foreachBatch((batchDF: DataFrame,batchId: Long) =>
batchDF.write
.format("org.apache.spark.sql.cassandra")
.cassandraFormat(cassandratable,cassandraKeyspace,cassandraCluster)
.mode("append")
.save())
.option("checkpointLocation",checkpointDir)
.start()
.awaitTermination()
在代码末尾,数据帧被写入cassandra表中。
在检查了UI的最后一个阶段后,没有任何部分可以执行保存/追加数据。
我想知道为什么它不存在或者我错过了什么。
==========================更改我的代码后================ ==========
.writeStream
// .foreachBatch((batchDF: DataFrame,batchId: Long) =>
// batchDF.write
// .format("org.apache.spark.sql.cassandra")
// .cassandraFormat(cassandratable,cassandraCluster)
// .mode("append")
// .save())
.cassandraFormat(cassandratable,cassandraCluster)
.option("checkpointLocation",checkpointDir)
.start()
.awaitTermination()
但是我可以在sql选项卡中看到WritetoDataSourceV2。
解决方法
也许不是直接回答您的问题,但是对于Spark 3.0和SCC 3.0.0(您应使用3.0.0-beta),您不应使用foreachBatch,而只需通过指定Cassandra格式按原样写入数据-本地支持SCC 2.5.0 Spark结构化流-请参见公告:https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all