问题描述
我正在尝试使用星火流消耗卡夫卡的一些数据。
我已经创建了2个工作,
- 一个简单的卡夫卡工作,使用:
consumeFirstStringMessageFrom(topic)
给出主题期望值。
{
"data": {
"type": "SA_LIST","login": "username@mycompany.com","updateDate": "2020-09-09T14:58:39.775Z","content": [
{
"sku": "800633955","status": "ACTIVE","quantity": 1
}
],"saCode": "E40056","clientId": "30179801688090","$setonInsert": {
"__v": 0
}
},"operation": "UPDATE","type": "List"
}
- 火花流作业:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaConfig.broker)
.option("subscribe",kafkaConfig.topic)
.option("startingOffsets",kafkaConfig.startingOffsets)
.load()
df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate",false)
.trigger(Trigger.ProcessingTime("2 seconds"))
.start().awaitTermination()
它显示以下结果
{
"key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==","value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==","topic": "PLP_GLOBAL_QA","partition": 0,"offset": 1826,"timestamp": "2020-09-10T16:09:08.606Z","timestampType": 0
}
它似乎显示了主题信息(键,值,主题,分区,偏移量...)。我缺少什么吗?
我可以根据需要添加更多信息。
解决方法
Spark Streaming作业以序列化的形式显示数据,而您的Kafka Consumer已经反序列化了。
根据Spark Structured Kafka integration guide,您不仅获得了Kafka消息的键和值,还获得了其他(元)信息。这是您从Kafka收到的每条消息的架构:
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
如果您只想选择键和值,甚至只选择值,则可以选择它们并将它们转换为人类可读的字符串:
[...]
.load()
.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]