使用pyspark识别大量数据中的更改

问题描述

我有DATE列和RESULT列的大量数据(大约十亿行)。 RESULT列中的值主要是名称,但是有时值会出现明显的偏差。我只想确定偏差较大的日期。 因此,从这样的输入数据帧开始:

+----------+------+
|      DATE|RESULT|
+----------+------+
|2020-06-24|   4.2|
|2020-05-17|   4.5|
|2020-05-11|   4.5|
|2020-07-30|   4.2|
|2020-07-30|   4.2|
|2020-06-29|   4.2|
|2020-06-29|   4.2|
|2020-03-04|   4.5|
|2020-06-01|   4.2|
|2020-06-27|   4.2|
|2020-06-29|   4.2|
|2020-06-29|   4.2|
|2020-04-17|   4.5|
|2020-04-17|   4.5|
|2020-01-04|   4.5|
|2020-02-29|   4.5|
|2020-07-07|   4.2|
|2020-05-07|   4.5|
|2020-06-09|   4.2|
|2020-06-22|   4.2|
+----------+------+

我希望输出为:

+----------+------+
|      DATE|RESULT|
+----------+------+
|2020-05-11|   4.5|
|2020-07-30|   4.2|
|2020-06-29|   4.2|
|2020-04-17|   4.5|
|2020-02-29|   4.5|
|2020-07-07|   4.2|
|2020-05-07|   4.5|
|2020-06-09|   4.2|
+----------+------+

我尝试使用window和lag函数,但是它将整个数据集强制为一个节点,因此失去了使用分布式计算的优势。 我在StackOverflow中遇到了一个建议,该建议使用中位数和均值绝对偏差(MAD)并定义一个阈值以标识具有异常移位的记录,但是我在pyspark.sql.functions库中找不到MAD统计函数。 有谁有更好的主意吗?我将不胜感激。 我在pyspark中编码,但是如果解决方案在spark / scala中也可以。 谢谢

解决方法

您可能会发现此链接对计算 MAD https://www.advancinganalytics.co.uk/blog/2020/9/2/identifying-outliers-in-spark-30

很有用

从以下链接添加相关内容:

MAD=中位数(|xi-xm|)

其中 xm 是数据集的中位数,xi 是数据集中的值。 MAD 是每个值与整个数据集的中位数之差的中位数

考虑一个包含“category”、“data_col”列的 df

'percentile()' 需要一列和一组百分位数来计算(对于中位数,我们可以提供'array(0.5)',因为 50% 的值是中位数)并且将返回一个结果数组。

MADdf = df.groupby('category')\
.agg(F.expr('percentile(data_col,array(0.5))')[0]\
.alias('data_col_median'))\
.join(df,"category","left")\
.withColumn("data_col_difference_median",F.abs(F.col('data_col')-F.col('data_col_median')))\
    .groupby('category','data_col_median')\
    .agg(F.expr('percentile(data_col_difference_median,array(0.5))')[0]\
    .alias('median_absolute_difference'))