在 Pyspark 结构化流中仅捕获 CDC 的有效负载?

问题描述

  • 我正在尝试建立从 sql Server 到 Pyspark 的管道以捕获 sql Server 中的数据更改,我已准备好一切:
  • 问题是:当我尝试使用控制台消费者检查数据更改是否通过 Kafka 时,它向我显示 JSON 格式的消息分为两条记录:Schema 和 Payload,在 Payload 内部有 Before 和 After您分别是更改前的数据和更改后的数据。
    • 我只在有效载荷中受到了治疗-->在此 JSON 消息的一部分之后
      • 因为当我像这样流式传输它时,在 Jupyter 命令行中我需要的字段上显示 null,我理解这是因为 JSON 格式很复杂
    • 这是我的 pyspark 代码
     import os

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'

import findspark

findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'

spark = SparkSession \
    .builder \
    .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
    .master("local[*]") \
    .getorCreate()

# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers",kafka_bootstrap_servers) \
    .option("subscribe",kafka_topic_name) \
    .load()

print("Printing Schema of df: ")
df.printSchema()


df1 = df.selectExpr("CAST(value AS STRING)","timestamp")
df1.printSchema()

 schema = StructType() \
        .add("name",StringType()) \
        .add("type",StringType())

df2 = df1\
        .select(from_json(col("value"),schema)\
        .alias("records"),"timestamp")
    df3 = df2.select("records.*","timestamp")

  print("Printing Schema of records_df3: ")
    df3.printSchema()

 records_write_stream = df3 \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("update") \
        .option("truncate","false")\
        .format("console") \
        .start()
    records_write_stream.awaitTermination()

    print("Stream Data Processing Application Completed.")
  • 这是一张图片显示了到达 Kafka 的 CDC 消息:

enter image description here

  • 如果有人知道如何只消耗有效负载-->在参与 Pyspark 结构化流媒体之后,请帮助我。

解决方法

您应该将您的 Debezeium 连接器修改为具有 value.converter.schemas.enabled=false,然后您将只有 payload 字段可供使用。

否则,您可以为整个对象创建一个类/模式以及 from_json() 函数,或者将值保留为字符串并使用 get_json_object() Spark 函数解析数据

同样相关 - 您可能想要提取 NewRecordState

,

-搜索更多信息后,我发现了如何仅显示和捕获 CDC 消息的有效负载部分。

  • 您需要将其添加到您的 Worker.properties 中:
value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=false