问题描述
- 我正在尝试建立从 sql Server 到 Pyspark 的管道以捕获 sql Server 中的数据更改,我已准备好一切:
- 问题是:当我尝试使用控制台消费者检查数据更改是否通过 Kafka 时,它向我显示 JSON 格式的消息分为两条记录:Schema 和 Payload,在 Payload 内部有 Before 和 After您分别是更改前的数据和更改后的数据。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getorCreate()
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",kafka_bootstrap_servers) \
.option("subscribe",kafka_topic_name) \
.load()
print("Printing Schema of df: ")
df.printSchema()
df1 = df.selectExpr("CAST(value AS STRING)","timestamp")
df1.printSchema()
schema = StructType() \
.add("name",StringType()) \
.add("type",StringType())
df2 = df1\
.select(from_json(col("value"),schema)\
.alias("records"),"timestamp")
df3 = df2.select("records.*","timestamp")
print("Printing Schema of records_df3: ")
df3.printSchema()
records_write_stream = df3 \
.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.option("truncate","false")\
.format("console") \
.start()
records_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")
解决方法
您应该将您的 Debezeium 连接器修改为具有 value.converter.schemas.enabled=false
,然后您将只有 payload
字段可供使用。
否则,您可以为整个对象创建一个类/模式以及 from_json()
函数,或者将值保留为字符串并使用 get_json_object()
Spark 函数解析数据
同样相关 - 您可能想要提取 NewRecordState
,-搜索更多信息后,我发现了如何仅显示和捕获 CDC 消息的有效负载部分。
- 您需要将其添加到您的 Worker.properties 中:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false