问题描述
Input_pyspark_dataframe:
id name collection student.1.price student.2.price student.3.price
111 aaa 1 100 999 232
222 bbb 2 200 888 656
333 ccc 1 300 777 454
444 ddd 1 400 666 787
output_pyspark_dataframe
id name collection price
111 aaa 1 100
222 bbb 2 888
333 ccc 1 300
444 ddd 3 787
我们可以使用集合列中的值找到每个ID的正确价格
问题
使用pyspark,如何通过动态构建列名student.{collection}.price
来找到每个ID的正确价格?
请让我知道。
解决方法
有点完整,但是您可以这样做。
fields
将为您提供struct字段student
的字段名称。您应该手动给它,最后得到1,2,3
。
然后第一行将student.{i}.price
的列i = range(1,4)
组成一个数组。类似地,第二行则组成了文字{i}
的数组。
现在,将这两个数组压缩为一个数组,例如
[('1',col('student.1.price')),...]
并爆炸数组,然后变成:
('1',col('student.1.price'))
('2',col('student.2.price'))
('3',col('student.3.price'))
由于arrays_zip
为您提供了一个struct数组,因此以上结果为struct type。通过使用struct key作为列来获取每个值,即index
和price
。
最后,您可以比较collection
和index
(这实际上是Student结构列的字段名称)。
import pyspark.sql.functions as f
fields = [field.name for field in next(field for field in df.schema.fields if field.name == 'student').dataType.fields]
df.withColumn('array',f.array(*map(lambda x: 'student.' + x + '.price',fields))) \
.withColumn('index',f.array(*map(lambda x: f.lit(x),fields))) \
.withColumn('zip',f.arrays_zip('index','array')) \
.withColumn('zip',f.explode('zip')) \
.withColumn('index',f.col('zip.index')) \
.withColumn('price',f.col('zip.array')) \
.filter('collection = index') \
.select('id','name','collection','price') \
.show(10,False)
+---+----+----------+-----+
|id |name|collection|price|
+---+----+----------+-----+
|111|aaa |1 |100 |
|222|bbb |2 |888 |
|333|ccc |1 |300 |
|444|ddd |3 |787 |
+---+----+----------+-----+