如何在pyspark数据帧读取方法中包括分区列 读取数据写数据从特定分区读取Avro分区数据

问题描述

我正在从镶木地板文件中编写基于Avro的文件。我已阅读以下文件

读取数据

dfParquet = spark.read.format("parquet").option("mode","FAILFAST")
    .load("/Users/rashmik/flight-time.parquet")

写数据

我以Avro格式编写了文件,如下所示:

dfParquetRePartitioned.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path","datasink/avro") \
    .partitionBy("OP_CARRIER") \
    .option("maxRecordsPerFile",100000) \
    .save()

按预期,我将数据按OP_CARRIER进行了分区。

从特定分区读取Avro分区数据

在另一个作业中,我需要从上述作业的输出中读取数据,即从datasink/avro目录中读取数据。我正在使用以下代码datasink/avro

中读取内容
dfAvro = spark.read.format("avro") \
    .option("mode","FAILFAST") \
    .load("datasink/avro/OP_CARRIER=AA")

它已成功读取数据,但是正如预期的那样,OP_CARRIER数据列中的dfAvro列不可用,因为它是第一个作业的分区列。现在,我的要求是在第二个数据帧(即OP_CARRIER)中也包含dfAvro字段。有人可以帮我吗?

我正在参考spark document中的文档,但是找不到相关信息。任何指针都将非常有帮助。

解决方法

您使用不同的别名复制相同的列值。

dfParquetRePartitioned.withColumn("OP_CARRIER_1",lit(df.OP_CARRIER)) \
.write \
.format("avro") \
.mode("overwrite") \
.option("path","datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile",100000) \
.save()

这会给您您想要的。但是别名不同。 或者,您也可以在阅读过程中这样做。如果位置是动态的,那么您可以轻松地添加该列。

path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=") 
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load(path).withColumn(newcol[0],lit(newcol[1]))

如果该值是静态的,则在读取数据期间更容易添加它。