通过其他列过滤的Spark Dataframe汇总总和

问题描述

我有一个Spark数据框,看起来像这样,其中商品编号,国家/地区代码和日期的每种组合都包含一行,而该组合中存在金额值。此数据框中大约有40万行。

articlenumber   countrycode   date         amount
--------------------------------------------------
4421-222-222    DE            2020-02-05   200
1234-567-890    EN            2019-05-23   42
1345-457-456    EN            2019-12-12   107

现在,我需要一个附加列“金额12M”,它根据以下规则为每一行计算其值:

在每一行中,“金额12M”应包含“金额”中所有值的总和,其中商品编号和国家/地区代码与该特定行中的值匹配,并且日期位于该行中日期之前的12个月之间

是否需要为尚无值的日期/国家/商品编号组合添加金额为0的行?

由于我不是编程专家(工程专业的学生),我需要一些帮助如何在处理该数据框的python脚本中实现这一目标。

感谢您对此提出任何想法。

解决方法

已编辑:

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('articlenumber','countrycode').orderBy('date').orderBy('yearmonth').rangeBetween(-11,0)

df.withColumn('yearmonth',f.expr('(year(date) - 2000) * 12 + month(date)')) \
  .withColumn('amount 12M',f.sum('amount').over(w)) \
  .orderBy('date').show(10,False)

+-------------+-----------+----------+------+---------+----------+
|articlenumber|countrycode|date      |amount|yearmonth|amount 12M|
+-------------+-----------+----------+------+---------+----------+
|4421-222-222 |DE         |2019-02-05|100   |230      |100       |
|4421-222-222 |DE         |2019-03-01|50    |231      |150       |
|1234-567-890 |EN         |2019-05-23|42    |233      |42        |
|1345-457-456 |EN         |2019-12-12|107   |240      |107       |
|4421-222-222 |DE         |2020-02-05|200   |242      |250       |
+-------------+-----------+----------+------+---------+----------+


我不确定确切的12个月,但这会起作用。

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('articlenumber','countrycode').orderBy('unix_date').rangeBetween(- 365 * 86400,0)

df.withColumn('unix_date',f.unix_timestamp('date','yyyy-MM-dd')) \
  .withColumn('amount 12M',False)

+-------------+-----------+----------+------+----------+----------+
|articlenumber|countrycode|date      |amount|unix_date |amount 12M|
+-------------+-----------+----------+------+----------+----------+
|4421-222-222 |DE         |2019-02-05|100   |1549324800|100       |
|4421-222-222 |DE         |2019-02-06|50    |1549411200|150       |
|1234-567-890 |EN         |2019-05-23|42    |1558569600|42        |
|1345-457-456 |EN         |2019-12-12|107   |1576108800|107       |
|4421-222-222 |DE         |2020-02-05|200   |1580860800|350       |
+-------------+-----------+----------+------+----------+----------+

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...