问题描述
我正在处理一个数据集,其中包含有关该特定项目销售数量的项目明智信息。但是,我打算估算的“已售数量”列中有一些负值。这里使用的逻辑是用日期级别的每个项目的销售数量模式替换这些负值。我已经计算了销售数量的每个不同值的计数,并获得了每个给定日期特定项目的最大销售数量。但是,我无法找到一个函数来将负值替换为每个项目* 日期组合的最大销售数量。我对 pyspark 比较新。在这种情况下使用哪种方法最好?
解决方法
根据您提供的有限信息,您可以尝试这样的操作-
from pyspark import SparkContext
from pyspark.sql import SQLContext
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql import Window
sc = SparkContext.getOrCreate()
sql = SQLContext(sc)
input_list = [
(1,10,"2019-11-07"),(1,5,"2019-11-08"),6,7,"2019-11-09"),8,-10,(2,3,9,2,"2019-11-09")
]
sparkDF = sql.createDataFrame(input_list,['product_id','sold_qty','date'])
sparkDF = sparkDF.withColumn('date',F.to_date(F.col('date'),'yyyy-MM-dd'))
模式实现
#### Mode Implemention
modeDF = sparkDF.groupBy('date','sold_qty')\
.agg(F.count(F.col('sold_qty')).alias('mode_count'))\
.select(F.col('date'),F.col('sold_qty').alias('mode_sold_qty'),F.col('mode_count'))
window = Window.partitionBy("date").orderBy(F.desc("mode_count"))
#### Filtering out the most occurred value
modeDF = modeDF\
.withColumn('order',F.row_number().over(window))\
.where(F.col('order') == 1)\
与 Base DataFrame 合并以进行估算
sparkDF = sparkDF.join(modeDF,sparkDF['date'] == modeDF['date'],'inner'
).select(sparkDF['*'],modeDF['mode_sold_qty'],modeDF['mode_count'])
sparkDF = sparkDF.withColumn('imputed_sold_qty',F.when(F.col('sold_qty') < 0,F.col('mode_sold_qty'))\
.otherwise(F.col('sold_qty')))
>>> sparkDF.show(100)
+----------+--------+----------+-------------+----------+----------------+
|product_id|sold_qty| date|mode_sold_qty|mode_count|imputed_sold_qty|
+----------+--------+----------+-------------+----------+----------------+
| 1| 7|2019-11-09| 2| 3| 7|
| 1| 7|2019-11-09| 2| 3| 7|
| 1| 8|2019-11-09| 2| 3| 8|
| 1| 8|2019-11-09| 2| 3| 8|
| 1| 8|2019-11-09| 2| 3| 8|
| 1| -10|2019-11-09| 2| 3| 2|
| 2| 5|2019-11-09| 2| 3| 5|
| 2| 5|2019-11-09| 2| 3| 5|
| 2| 2|2019-11-09| 2| 3| 2|
| 2| 2|2019-11-09| 2| 3| 2|
| 2| 2|2019-11-09| 2| 3| 2|
| 2| -10|2019-11-09| 2| 3| 2|
| 1| 5|2019-11-08| 9| 1| 5|
| 1| 6|2019-11-08| 9| 1| 6|
| 2| 9|2019-11-08| 9| 1| 9|
| 2| -10|2019-11-08| 9| 1| 9|
| 1| 10|2019-11-07| 5| 2| 10|
| 1| 5|2019-11-07| 5| 2| 5|
| 1| 5|2019-11-07| 5| 2| 5|
| 2| 10|2019-11-07| 5| 2| 10|
| 2| 3|2019-11-07| 5| 2| 3|
| 2| 9|2019-11-07| 5| 2| 9|
+----------+--------+----------+-------------+----------+----------------+