从Kafka主题读取文件路径,然后在结构化流中读取文件并写入DeltaLake

问题描述

我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka 卡夫卡中的消息。我必须使用 Spark 结构化流处理数据。

我想到的设计如下:

  1. 在 kafka Spark 结构化流中,读取包含数据路径的消息。
  2. 在驱动程序中收集消息记录。 (消息很小)
  3. 从数据位置创建数据框。
kafkaDf.select($"value".cast(StringType))
       .writeStream.foreachBatch((batchDf:DataFrame,batchId:Long) =>  {
  //rough code
  //collect to driver
  val records = batchDf.collect()
  //create dataframe and process
  records foreach((rec: Row) =>{
    println("records:######################",rec.toString())
    val path = rec.getAs[String]("data_path")
    val dfToProcess = spark.read.json(path)
    ....
  })
}

我想知道您的意见,这种方法好不好?特别是如果在调用 collect 后创建 Dataframe 存在一些问题。 如果有更好的方法,请告诉我。

解决方法

你的想法完美无缺。

实际上,必须将您的 Dataframe 收集到驱动程序。否则,您无法通过在每个执行器上调用 SparkSession 来创建分布式数据集。如果没有 collect,您最终会遇到 NullPointerException。

我稍微重新编写了您的代码 sceleton,并实现了如何将您的数据帧写入增量表的部分(基于您的其他 question)。此外,我使用的是 Dataset[String] 而不是 Dataframe[Row],这让生活更轻松。

将 Spark 3.0.1 与 delta-core 0.7.0 一起使用效果很好。例如我的测试文件看起来像

{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}

我将该文件的位置发送到名为“test”的 Kafka 主题,并应用以下代码来解析该文件,并使用以下代码将其列(基于给定架构)写入增量表中:

  val spark = SparkSession.builder()
    .appName("KafkaConsumer")
    .master("local[*]")
    .getOrCreate()

  val jsonSchema = new StructType()
    .add("a",StringType)
    .add("b",StringType)

  val deltaPath = "file:///tmp/spark/delta/test"

  import spark.implicits._
  val kafkaDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("subscribe","test")
    .option("startingOffsets","latest")
    .option("failOnDataLoss","false")
    .load()
    .selectExpr("CAST(value AS STRING) as data_path")
    .as[String]

  kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String],batchId:Long) => {
    // collect to driver
    val records = batchDf.collect()

    // create dataframe based on file location and process and write to Delta-Lake
    records.foreach((path: String) => {
      val dfToProcess = spark.read.schema(jsonSchema).json(path)
      dfToProcess.show(false) // replace this line with your custom processing logic
      dfToProcess.write.format("delta").save(deltaPath)
    })
  }).start()

  spark.streams.awaitAnyTermination()

show 调用的输出符合预期:

+----+----+
|a   |b   |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+

并且数据已经作为增量表写入到通过deltaPath指定的位置

/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x  595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x   16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc