问题描述
我在同一台计算机上本地运行Kafka,Spark和Cassandra。
每隔1分钟,我会将一些数据流式传输到Kafka主题中(键是以秒为单位的unix纪元,值是CPU温度),并将最新的偏移量读取到PySpark流式传输DataFrame中。经过一些基本的列转换后,我试图将这些数据附加到Cassandra表的末尾。 PySpark脚本似乎可以正确执行,但是没有数据写入Cassandra。我可能做错了什么?
我一直关注:(i)(How to write streaming Dataset to Cassandra?),(ii)(Cassandra Sink for PySpark Structured Streaming from Kafka topic)和(iii)Writing Spark streaming PySpark dataframe to Cassandra overwrites table instead of appending
CQL DDL创建表
CREATE TABLE cputemperature (
key bigint,value double,topic text,partition int,offset bigint,timestamp timestamp,timestampType int,PRIMARY KEY (value,offset)
)
WITH CLUSTERING ORDER BY (offset ASC);
PySpark脚本
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import sys
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
spark.sparkContext.setLogLevel('OFF')
def helper_spark_source_kafka():
'''
This function reads latest offset of Kafka stream into a PySpark streaming dataframe,performs
column data type conversions and returns streaming dataframe.
'''
try:
df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
df2 = df1.withColumn("key",df1["key"].cast("string")).withColumn("value",df1["value"].cast("string"))
df3 = df2.withColumn("key",df2["key"].cast("long")).withColumn("value",df2["value"].cast("double"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
except:
print("Unexpected error:",sys.exc_info()[0])
print("Unexpected error:",sys.exc_info()[1])
print("Unexpected error:",sys.exc_info()[2])
sys.exit(1)
return df4
def helper_spark_sink_cassandra(write_df,epoch_id):
'''
This function defines the Cassandra sink for PySpark streaming dataframe.
'''
write_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="cputemperature",keyspace="kafkaspark")\
.mode("append")
if __name__ == '__main__':
streaming_df = helper_spark_source_kafka()
# query_console = streaming_df.writeStream.format("console")
# query_console.start().awaitTermination()
query_cassandra = streaming_df.writeStream \
.trigger(processingTime="60 seconds") \
.outputMode("update") \
.foreachBatch(helper_spark_sink_cassandra)
query_cassandra.start().awaitTermination()
火花提交命令
$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions $HOME/cputemplogs/pysparkscript.py
卡桑德拉桌上SELECT *
的屏幕截图
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)