问题描述
我有一个需要流式传输的JSON文件,因此我需要预先声明架构。该文件有300列以上,因此我想读取数据并推断模式,将该模式文件(以任何可用的格式保存)到S3,然后在使用该模式文件定义流传输的同时继续传输文件模式。
到目前为止,我所做的是:
read_df = spark.read.load(inputPath,format='json',inferSchema=True)
my_schema = read_df.schema
在这一点上,我尝试通过my_schema.json
提取模式,该模式显然是StructType
,但是我不知道如何将其持久化到S3。 my_schema.jsonValue()
输出JSON,但是同样,我不确定如何将其写入S3。我尝试过:
with open("/dbfs:/my-mounted_drive/my_schema.json","w") as f:
json.dump(my_schema.jsonValue(),f)
这段代码给我这个错误:FileNotFoundError: [Errno 2] No such file or directory:
with open("my_schema.json","w") as f:
json.dump(my_schema.jsonValue(),f)
我完全不知道如何完成该解决方案,但是完美的流程将是读取并推断->保存到S3->流数据,其中 schema=<myfile>
有一些类似的问题,但没有重复,此外,这些解决方案都不适合我。
TIA
解决方法
您可以使用以下语法的dbutils
put方法
dbutils.fs.put(s"/mnt/" + MountName,json.dump(my_schema.jsonValue()))
考虑到您在MountName
中具有有效的装载名称