在 Spark Structured Streaming 中从 Kafka 消息中读取换行符分隔的 json

问题描述

我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。 然后我尝试使用 PySpark 在 Spark Structured Streaming 中阅读这些消息,如下所示:

events_df = select(from_json(col("value").cast("string"),schema).alias("value"))

但此代码仅适用于单个 json 文档。 如果该值包含多条记录作为换行符分隔的 json,Spark 将无法正确解码。

我不想为每个事件都发送一条 kafka 消息。我怎样才能做到这一点?

解决方法

我设法以这种方式做我正在寻找的东西,用换行符拆分完整的文本字符串,然后在行中分解要使用架构解析的数组:

    events = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers","localhost:9092") \
        .option("subscribe","quickstart-events") \
        .option("startingOffsets","earliest")\
        .load()\
        .selectExpr("CAST(value AS STRING) as data")
    
    events = events.select(explode(split(events.data,'\n')))
    events = events.select(from_json(col("col"),event_schema).alias('value'))
    events = events.selectExpr('value.*')```