我不明白为什么最后阶段没有任何保存或追加数据操作

问题描述

      df.writeStream
      .foreachBatch((batchDF: DataFrame,batchId: Long) =>
        batchDF.write
          .format("org.apache.spark.sql.cassandra")
          .cassandraFormat(cassandratable,cassandraKeyspace,cassandraCluster)
          .mode("append")
          .save())
      .option("checkpointLocation",checkpointDir)
      .start()
      .awaitTermination()

代码末尾,数据帧被写入cassandra表中。

在检查了UI的最后一个阶段后,没有任何部分可以执行保存/追加数据。

enter image description here

我想知道为什么它不存在或者我错过了什么。

==========================更改我的代码后================ ==========

      .writeStream
//      .foreachBatch((batchDF: DataFrame,batchId: Long) =>
//        batchDF.write
//          .format("org.apache.spark.sql.cassandra")
//          .cassandraFormat(cassandratable,cassandraCluster)
//          .mode("append")
//          .save())
      .cassandraFormat(cassandratable,cassandraCluster)
      .option("checkpointLocation",checkpointDir)
      .start()
      .awaitTermination()

enter image description here

但是我可以在sql选项卡中看到WritetoDataSourceV2。

enter image description here

解决方法

也许不是直接回答您的问题,但是对于Spark 3.0和SCC 3.0.0(您应使用3.0.0-beta),您不应使用foreachBatch,而只需通过指定Cassandra格式按原样写入数据-本地支持SCC 2.5.0 Spark结构化流-请参见公告:https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all