使用kafka方法和火花流从kafka消费会产生不同的结果

问题描述

我正在尝试使用星火流消耗卡夫卡的一些数据。

我已经创建了2个工作,

  1. 一个简单的卡夫卡工作,使用:
consumeFirstStringMessageFrom(topic)

给出主题期望值。

{
  "data": {
    "type": "SA_LIST","login": "username@mycompany.com","updateDate": "2020-09-09T14:58:39.775Z","content": [
      {
        "sku": "800633955","status": "ACTIVE","quantity": 1
      }
    ],"saCode": "E40056","clientId": "30179801688090","$setonInsert": {
      "__v": 0
    }
  },"operation": "UPDATE","type": "List"
}
  1. 火花流作业:
val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",kafkaConfig.broker)
      .option("subscribe",kafkaConfig.topic)
      .option("startingOffsets",kafkaConfig.startingOffsets)
      .load()

 df.writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .option("truncate",false)
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start().awaitTermination()

显示以下结果

{
  "key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==","value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==","topic": "PLP_GLOBAL_QA","partition": 0,"offset": 1826,"timestamp": "2020-09-10T16:09:08.606Z","timestampType": 0
}

它似乎显示主题信息(键,值,主题,分区,偏移量...)。我缺少什么吗?

我可以根据需要添加更多信息。

解决方法

Spark Streaming作业以序列化的形式显示数据,而您的Kafka Consumer已经反序列化了。

根据Spark Structured Kafka integration guide,您不仅获得了Kafka消息的键和值,还获得了其他(元)信息。这是您从Kafka收到的每条消息的架构:

Column      Type
key         binary
value       binary
topic       string
partition   int
offset      long
timestamp   timestamp
timestampType   int

如果您只想选择键和值,甚至只选择值,则可以选择它们并将它们转换为人类可读的字符串:

[...]
  .load()
  .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
  .as[(String,String)]