PySpark查询一个列名,该值出现在另一列中

问题描述

Input_pyspark_dataframe:

id   name  collection  student.1.price  student.2.price  student.3.price
111  aaa      1           100              999               232
222  bbb      2           200              888               656
333  ccc      1           300              777               454
444  ddd      1           400              666               787

output_pyspark_dataframe

id   name  collection    price  
111  aaa      1           100           
222  bbb      2           888            
333  ccc      1           300             
444  ddd      3           787       

我们可以使用集合列中的值找到每个ID的正确价格

问题

使用pyspark,如何通过动态构建列名student.{collection}.price来找到每个ID的正确价格?

请让我知道。

解决方法

有点完整,但是您可以这样做。

fields将为您提供struct字段student的字段名称。您应该手动给它,最后得到1,2,3

然后第一行将student.{i}.price的列i = range(1,4)组成一个数组。类似地,第二行则组成了文字{i}的数组。

现在,将这两个数组压缩为一个数组,例如

[('1',col('student.1.price')),...]

并爆炸数组,然后变成:

('1',col('student.1.price'))
('2',col('student.2.price'))
('3',col('student.3.price'))

由于arrays_zip为您提供了一个struct数组,因此以上结果为struct type。通过使用struct key作为列来获取每个值,即indexprice

最后,您可以比较collectionindex(这实际上是Student结构列的字段名称)。

import pyspark.sql.functions as f

fields = [field.name for field in next(field for field in df.schema.fields if field.name == 'student').dataType.fields]

df.withColumn('array',f.array(*map(lambda x: 'student.' + x + '.price',fields))) \
  .withColumn('index',f.array(*map(lambda x: f.lit(x),fields))) \
  .withColumn('zip',f.arrays_zip('index','array')) \
  .withColumn('zip',f.explode('zip')) \
  .withColumn('index',f.col('zip.index')) \
  .withColumn('price',f.col('zip.array')) \
  .filter('collection = index') \
  .select('id','name','collection','price') \
  .show(10,False)

+---+----+----------+-----+
|id |name|collection|price|
+---+----+----------+-----+
|111|aaa |1         |100  |
|222|bbb |2         |888  |
|333|ccc |1         |300  |
|444|ddd |3         |787  |
+---+----+----------+-----+