Spark 中的直通异构非统一JSON 列

问题描述

我们使用 Apache Spark 3.1.x 处理数据,其中一个字段包含完全自由格式的 JSON,因此各个记录可以包含相同的键,但具有不同的数据类型(本示例中的 payload.field1 可以是字符串、布尔值或数字):

{"timestamp": "2021-07-30T09:41:51Z","payload": {"field1": "some text"}}
{"timestamp": "2021-07-30T09:41:52Z","payload": {"field1": true}}
{"timestamp": "2021-07-30T09:41:53Z","payload": {"field1": 123}}

我们的目标是保持 payload 字段完好无损。当我们让 Spark 自动检测架构时:

 Dataset<Row> events = spark.read().json("file:////users/user/input.json");

 // some processing is going on here

 events.write().json("file:///users/user/output.json");

输出如下(注意 payload.field 现在是一个字符串):

{"payload":{"field1":"some text"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"true"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"123"},"timestamp":"2021-07-30T09:41:52Z"}

Spark 的 printSchema() 输出

root
 |-- payload: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |-- timestamp: string (nullable = true)

到目前为止,我们提出的最佳解决方法是:

Dataset<String> eventsAsstring = spark.read().text("file:////users/user/input.json").as(Encoders.STRING());

Dataset<Row> events2 = eventsAsstring.select( //
        get_json_object(col("value"),"$.timestamp").alias("timestamp"),//
        get_json_object(col("value"),"$.payload").alias("payload") // This will keep payload as string for Spark
);

// Do some processing of events here

// We have to write JSON as string to prevent Spark from encoding payload's field JSON:
events2.withColumn("joined",concat( //
        format_string("{\"timestamp\":\"%s\",",col("timestamp")),//
        format_string("\"payload\":%s}",col("payload")) //
)).select(col("joined")).write().text("file:///users/user/output.txt"); 
   

我们得到的输出是我们想要的,数据类型不变:

{"timestamp":"2021-07-30T09:41:51Z","payload":{"field1":"some text"}}
{"timestamp":"2021-07-30T09:41:52Z","payload":{"field1":true}}
{"timestamp":"2021-07-30T09:41:53Z","payload":{"field1":123}}

上述解决方案有效,但感觉超级笨拙。也许我们在这里遗漏了一些明显的东西?

提前致谢!

解决方法

对于读取,我们可以在读取之前指定架构。 对于写作,我想不出更好的主意。

    val schema = StructType(Array[StructField](
      StructField("timestamp",DataTypes.StringType,false,Metadata.empty),StructField("payload",Metadata.empty)) // force string type
    )

    val df: Dataset[Row] = spark.read.schema(schema).json(ds)

    df.map(r => "{\"timestamp\": \"%s\",\"payload\": %s".format(r.getString(0),r.getString(1)))
      .write.text("xxx")