如何滞后偏移

问题描述

我有一个给定的数据框,如下所示:

 TEST_schema = StructType([StructField("Date",StringType(),True),\
                          StructField("START",\
                          StructField("quantity",IntegerType(),\
                          StructField("col1",StructField("col2",True)])
TEST_data = [('2020-08-15','2020-08-19',1,'2020-08-05','2020-08-09'),('2020-08-16',2,'2020-08-09')\,('2020-08-17',3,'2020-08-06',\
             ('2020-08-18',4,'2020-08-10','2020-08-11'),('2020-08-19',5,'2020-08-16','2020-08-19'),\
             ('2020-08-20',6,'2020-08-20','2020-08-25'),('2020-08-21',7,'2020-08-21'),\
             ('2020-08-22',8,'2020-08-24'),('2020-08-23',9,'2020-08-09')]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data,TEST_schema)
TEST_df = TEST_df.withColumn("Date",to_date("Date"))\
             .withColumn("START",to_date("START"))\
             .withColumn("col1",to_date("col1"))\
             .withColumn("col2",to_date("col2"))\
           
TEST_df.show()

+----------+----------+--------+----------+----------+
|      Date|     START|quantity|      col1|      col2|
+----------+----------+--------+----------+----------+
|2020-08-15|2020-08-19|       1|2020-08-05|2020-08-09|
|2020-08-16|2020-08-19|       2|2020-08-05|2020-08-09|
|2020-08-17|2020-08-19|       3|2020-08-06|2020-08-09|
|2020-08-18|2020-08-19|       4|2020-08-10|2020-08-11|
|2020-08-19|2020-08-19|       5|2020-08-16|2020-08-19|
|2020-08-20|2020-08-19|       6|2020-08-20|2020-08-25|
|2020-08-21|2020-08-19|       7|2020-08-20|2020-08-21|
|2020-08-22|2020-08-19|       8|2020-08-19|2020-08-24|
|2020-08-23|2020-08-19|       9|2020-08-05|2020-08-09|
+----------+----------+--------+----------+----------+

其中col1和col2可能不是唯一的,并且Date仅是增量日期,而START是唯一的。 我的逻辑是,如果START == col2,lag(quantity,offset= datediff(col2,col1),0)否则为0。 在这种情况下,datediff(col2,col1)是3天。 尝试1。

from pyspark.sql.functions import when,col,datediff,expr
TEST_df = TEST_df.withColumn('datedifff',datediff(col('col2'),col('col1')))\
                  .withColumn('want',expr("IF(START == col2,lag(quantity,datedifff,0),0)          "))

存在文字错误...

所以我的结果 df 如下所示:

enter image description here

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)