问题描述
我通过Kafka主题接收JSON字符串格式的流数据,我将每个记录与预定义的架构进行匹配,并读取必填字段。但是我想捕获所有与给定架构不匹配的动态出现的字段,并将它们发送到Elastic Search索引/将它们收集到单独的文件中(基本上是一种存储与给定架构不匹配的额外字段的方法)。
JSON record is as follows:
{"Name":"Rishi","Age":"15","Gender":"Male","Standard":"8","Hobby":"Dance","Sport":"Hockey","NSS":"Active"}
or it can be as follows:
{"Name":"Rishi","NSS":"Active","Culturals":"Active","Attendance":"90"}
If my schema has only five fieldsdefined as follows:
from pyspark.sql.types import StringType,StructField,StructType,IntegerType
schema = StructType([
StructField("Name",StringType(),True),StructField("Age",IntegerType(),StructField("Gender",StructField("Standard",StructField("Sport",])
因此,我的数据框使用此架构仅获取这5个字段。但是我也想收集和存储其他字段。在Pyspark中有什么方法可以实现?
谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)