PySpark 中的移动平均和标准差

问题描述

我正在阅读。来自 datalake 的约 500 万条记录数据集。我需要按某些分组计算均值、计数、标准差。我将这些结果保存到蜂巢表中。我使用以下代码计算了历史人口

历史:

from pyspark.sql import functions as F
# read source data to dfmis dataframe and aggregate using below. 
df_stats = dfmis.groupBy("col1","col2","col3","col4")
                .agg(F.mean('value')
                .alias('mean'),F.count('value').alias('count'),F.stddev('value').alias('standard_deviation'))

代码输出将如下所示。

col1 col2 col3 col4 平均 计数 标准偏差
1 阈值1 0 20000 0.038937 750 0.022564
1 阈值1 20000 40000 0.047813 260 0.032230
1 阈值2 0 20000 0.026144 3 0.000000
1 阈值2 20000 40000 0.329935 25 0.149295
1 阈值3 0 20000 0.034335 458 0.017798
1 阈值3 20000 40000 0.028277 190 0.010326
2 阈值1 0 20000 0.041236 828 0.034252
2 阈值1 20000 40000 0.073214 228 0.050773
2 阈值2 0 20000 0.090278 32 0.078175
2 阈值2 20000 40000 0.372851 65 0.248227
2 阈值3 0 20000 0.067187 758 0.042333
2 阈值3 20000 40000 0.067320 240 0.021634
3 阈值1 0 20000 0.039631 724 0.029227
3 阈值1 20000 40000 0.042689 207 0.022464
3 阈值2 0 20000 0.154486 33 0.108949
3 阈值2 20000 40000 0.470767 73 0.238893
3 阈值3 0 20000 0.048505 667 0.024955
3 阈值3 20000 40000 0.063335 226 0.024734
4 阈值1 0 20000 0.041083 945 0.027195
4 阈值1 20000 40000 0.049683 266 0.026589
4 阈值2 0 20000 0.109329 11 0.097694

增量

对于增量数据 - 我每天会收到 100 万到 150 万条记录,而且将来还会增长。因此,这个想法是从上面的 hive/output 中读取历史平均值、标准偏差和计数(按每组),并使用这些值来计算新的平均值、标准偏差和计数,并用新的平均值、计数、stddev 覆盖 hive 表数据每组。

我查看了几种解决方案,但无法做到。我查看了 Welford 的在线算法,但并没有完全理解它。寻找有关如何处理 pyspark、pandas 或 numpy 解决方案中的增量均值、stddev 的指导。任何帮助深表感谢!谢谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)