将Spark Streaming PySpark数据帧写入Cassandra会覆盖表格,而不是附加

问题描述

我正在运行Kafka,Spark和Cassandra的1节点集群。所有本地都在同一台计算机上。

通过一个简单的Python脚本,我每5秒将一些伪数据流式传输到Kafka主题中。然后,使用Spark结构化流,将这个数据流(一次一行)读入一个startingOffset = latest的PySpark DataFrame中。最后,我试图将此行追加到已经存在的Cassandra表中。

我一直在关注(How to write streaming Dataset to Cassandra?)和(Cassandra Sink for PySpark Structured Streaming from Kafka topic)。

已将一行数据成功写入Cassandra表中,但我的问题是每次而不是将其附加到表末尾都被覆盖。我可能做错了什么?

这是我的代码:

CQL DDL,用于在Cassandra中创建kafkaspark键空间,后跟randintstream表:

DESCRIBE keyspaces;

CREATE KEYSPACE kafkaspark
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy','replication_factor' : 1 
  };
  
USE kafkaspark; 

CREATE TABLE randIntStream (
    key int,value int,topic text,partition int,offset bigint,timestamp timestamp,timestampType int,PRIMARY KEY (partition,topic)
);

启动PySpark外壳

./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

将来自Kafka主题的最新消息阅读到流式DataFrame中:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()

一些转换和检查模式:

df2 = df.withColumn("key",df["key"].cast("string")).withColumn("value",df["value"].cast("string"))
df3 = df2.withColumn("key",df2["key"].cast("integer")).withColumn("value",df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()

写入Cassandra的功能

def writeToCassandra(writeDF,epochId):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="randintstream",keyspace="kafkaspark") \
    .mode("append") \
    .save()

最后,查询要从Spark写入Cassandra:

query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()

SELECT *在Cassandra中的表格上:

enter image description here

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)