问题描述
我正在阅读。来自 datalake 的约 500 万条记录数据集。我需要按某些分组计算均值、计数、标准差。我将这些结果保存到蜂巢表中。我使用以下代码计算了历史人口
历史:
from pyspark.sql import functions as F
# read source data to dfmis dataframe and aggregate using below.
df_stats = dfmis.groupBy("col1","col2","col3","col4")
.agg(F.mean('value')
.alias('mean'),F.count('value').alias('count'),F.stddev('value').alias('standard_deviation'))
col1 | col2 | col3 | col4 | 平均 | 计数 | 标准偏差 |
---|---|---|---|---|---|---|
1 | 阈值1 | 0 | 20000 | 0.038937 | 750 | 0.022564 |
1 | 阈值1 | 20000 | 40000 | 0.047813 | 260 | 0.032230 |
1 | 阈值2 | 0 | 20000 | 0.026144 | 3 | 0.000000 |
1 | 阈值2 | 20000 | 40000 | 0.329935 | 25 | 0.149295 |
1 | 阈值3 | 0 | 20000 | 0.034335 | 458 | 0.017798 |
1 | 阈值3 | 20000 | 40000 | 0.028277 | 190 | 0.010326 |
2 | 阈值1 | 0 | 20000 | 0.041236 | 828 | 0.034252 |
2 | 阈值1 | 20000 | 40000 | 0.073214 | 228 | 0.050773 |
2 | 阈值2 | 0 | 20000 | 0.090278 | 32 | 0.078175 |
2 | 阈值2 | 20000 | 40000 | 0.372851 | 65 | 0.248227 |
2 | 阈值3 | 0 | 20000 | 0.067187 | 758 | 0.042333 |
2 | 阈值3 | 20000 | 40000 | 0.067320 | 240 | 0.021634 |
3 | 阈值1 | 0 | 20000 | 0.039631 | 724 | 0.029227 |
3 | 阈值1 | 20000 | 40000 | 0.042689 | 207 | 0.022464 |
3 | 阈值2 | 0 | 20000 | 0.154486 | 33 | 0.108949 |
3 | 阈值2 | 20000 | 40000 | 0.470767 | 73 | 0.238893 |
3 | 阈值3 | 0 | 20000 | 0.048505 | 667 | 0.024955 |
3 | 阈值3 | 20000 | 40000 | 0.063335 | 226 | 0.024734 |
4 | 阈值1 | 0 | 20000 | 0.041083 | 945 | 0.027195 |
4 | 阈值1 | 20000 | 40000 | 0.049683 | 266 | 0.026589 |
4 | 阈值2 | 0 | 20000 | 0.109329 | 11 | 0.097694 |
增量
对于增量数据 - 我每天会收到 100 万到 150 万条记录,而且将来还会增长。因此,这个想法是从上面的 hive/output 中读取历史平均值、标准偏差和计数(按每组),并使用这些值来计算新的平均值、标准偏差和计数,并用新的平均值、计数、stddev 覆盖 hive 表数据每组。
我查看了几种解决方案,但无法做到。我查看了 Welford 的在线算法,但并没有完全理解它。寻找有关如何处理 pyspark、pandas 或 numpy 解决方案中的增量均值、stddev 的指导。任何帮助深表感谢!谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)