问题描述
我正在使用wholeTextFiles()
在python spark中读取.txt文件。我知道在读取wholeTextFiles()
之后,生成的rdd将具有格式(文件路径,内容)。我有多个文件要读取。我想从文件路径中剪切文件名,并保存到spark数据框和文件名的一部分,作为HDFS位置中的日期文件夹。但是在保存时,我没有得到相应的文件名。有什么办法吗?下面是我的代码
base_data = sc.wholeTextFiles("/user/nikhil/raw_data/")
data1 = base_data.map(lambda x : x[0]).flatMap(lambda x : x.split('/')).filter(lambda x : x.startswith('CH'))
data2=data1.flatMap(lambda x : x.split('F_')).filter(lambda x : x.startswith('2'))
print(data1.collect())
print(data2.collect())
df.repartition(1).write.mode('overwrite').parquet(outputLoc + "/xxxxx/" + data2)
logdf = sqlContext.createDataFrame(
[(data1,pstrt_time,pend_time,'DeltaLoad Completed')],["filename","process_start_time","process_end_time","status"])`
输出:
data1: ['CHNC_P0BCDNAF_20200217','CHNC_P0BCDNAF_20200227','CHNC_P0BCDNAF_20200615','CHNC_P0BCDNAF_20200925']
data2: ['20200217','20200227','20200615','20200925']
解决方法
这里有一个 Scala版本,可以通过良好的自我轻松转换为pyspark:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
val files = sc.wholeTextFiles("/FileStore/tables/*ZZ.txt",0)
val res1 = files.map(line => (line._1,line._2.split("\n").flatMap(x => x.split(" ")) )).map(elem => {(elem._1,elem._2) })
val res2 = res1.flatMap {
case (x,y) => {
y.map(z => (x,z))
}}
val res3 = res2.map(line => (line._1,line._1.split("/")(3),line._2))
val df = res3.toDF()
val df2 = df.withColumn("s",split($"_1","/"))
.withColumn("f1",$"s"(3))
.withColumn("f2",$"f1".cast(StringType)) // avoid issues with split subsequently
.withColumn("filename",substring_index(col("f2"),".",1))
df2.show(false)
df2.repartition($"filename").write.mode("overwrite").parquet("my_parquet") // default 200 and add partitionBy as well for good measure on your `write`.
一些示例数据,您可以通过.drop
或使用select
剥离:
+--------------------------------+---------+-------+-------------------------------------+---------+---------+--------+
|_1 |_2 |_3 |s |f1 |f2 |filename|
+--------------------------------+---------+-------+-------------------------------------+---------+---------+--------+
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww |[dbfs:,FileStore,tables,AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww |[dbfs:,AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|rrr |[dbfs:,AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt| |[dbfs:,AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|4445
...
标点符号去除的常用方面,要应用的空间修整。当然,您需要适应文件名的情况,我看不到。
问题是您无法拆分已经分裂的东西。