问题描述
我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。 然后我尝试使用 PySpark 在 Spark Structured Streaming 中阅读这些消息,如下所示:
events_df = select(from_json(col("value").cast("string"),schema).alias("value"))
但此代码仅适用于单个 json 文档。 如果该值包含多条记录作为换行符分隔的 json,Spark 将无法正确解码。
我不想为每个事件都发送一条 kafka 消息。我怎样才能做到这一点?
解决方法
我设法以这种方式做我正在寻找的东西,用换行符拆分完整的文本字符串,然后在行中分解要使用架构解析的数组:
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","quickstart-events") \
.option("startingOffsets","earliest")\
.load()\
.selectExpr("CAST(value AS STRING) as data")
events = events.select(explode(split(events.data,'\n')))
events = events.select(from_json(col("col"),event_schema).alias('value'))
events = events.selectExpr('value.*')```