问题描述
srl_no created_on completed_on prev_completed_on time_from_last Dense_Rank
XXXXXX1 2020-10-09T08:52:25 2020-10-09T08:57:45 null null 1
XXXXXX1 2020-10-09T09:04:32 2020-10-09T09:06:37 2020-10-09T08:57:45 407 2
XXXXXX1 2020-10-09T09:10:10 2020-10-09T09:12:17 2020-10-09T09:06:37 213 3
XXXXXX1 2020-10-09T09:10:10 2020-10-09T09:12:17 2020-10-09T09:12:17 -127 3
我想从prev_completed_on
中减去created_on
以得到time_from_last
,但是由于最后两行具有相同的created_on
和completed_on
消极的时间。在这种情况下,我需要从第二行中减去该值,即基于dense_rank
列减去。
因此在上述情况下,我需要从第四行的completed_on
的值中减去第二行的created_on
的值。
以上代码
df = spark.createDataFrame(
[
('XXXXXX1','2020-10-09T08:52:25','2020-10-09T08:57:45'),# create your data here,be consistent in the types.
('XXXXXX1','2020-10-09T09:04:32','2020-10-09T09:06:37'),('XXXXXX1','2020-10-09T09:10:10','2020-10-09T09:12:17'),],['srl_no','created_on','completed_on'] # add your columns label here
)
df = df.withColumn('created_on',f.col('created_on').cast(TimestampType()))
df = df.withColumn('created_on',f.col('created_on').cast(TimestampType()))
partition_cols = ["srl_no"]
window_clause = Window.partitionBy(partition_cols).orderBy(f.col('completed_on').asc())
# create the row number column
df1 = df.withColumn('prev_completed_on',f.lag(f.col("completed_on"))\
.over(window_clause).cast(TimestampType()))
df1 = df1.withColumn('dense_rank',f.dense_rank()\
.over(window_clause))
df1 = df1.withColumn("time_from_last",\
f.col("created_on").cast(LongType()) - col("prev_completed_on").cast(LongType()))
预期产量
srl_no created_on completed_on prev_completed_on time_from_last Dense_Rank
XXXXXX1 2020-10-09T08:52:25 2020-10-09T08:57:45 null null 1
XXXXXX1 2020-10-09T09:04:32 2020-10-09T09:06:37 2020-10-09T08:57:45 407 2
XXXXXX1 2020-10-09T09:10:10 2020-10-09T09:12:17 2020-10-09T09:06:37 213 3
XXXXXX1 2020-10-09T09:10:10 2020-10-09T09:12:17 2020-10-09T09:12:17 **213** 3
解决方法
这里的技巧是使用groupby来获取每个srl_no
,dense_rank
的最短日期。将其重新连接到准备好的数据框时,您将获得所需的结果。
df = spark.createDataFrame(
[
('XXXXXX1','2020-10-09T08:52:25','2020-10-09T08:57:45'),# create your data here,be consistent in the types.
('XXXXXX1','2020-10-09T09:04:32','2020-10-09T09:06:37'),('XXXXXX1','2020-10-09T09:10:10','2020-10-09T09:12:17'),],['srl_no','created_on','completed_on'] # add your columns label here
)
df = df.withColumn('created_on',F.col('created_on').cast(T.TimestampType()))
df = df.withColumn('created_on',F.col('created_on').cast(T.TimestampType()))
partition_cols = ["srl_no"]
window_clause = Window.partitionBy(partition_cols).orderBy(F.col('completed_on').asc())
# create the row number column
df_with_rank = df.withColumn('prev_completed_on',F.lag(F.col("completed_on"))\
.over(window_clause).cast(T.TimestampType()))
df_with_rank = df_with_rank.withColumn('dense_rank',F.dense_rank()\
.over(window_clause))
dense_rank = df_with_rank.groupby("srl_no","dense_rank") \
.agg(F.min('prev_completed_on').alias('prev_completed_on'))
df_with_rank = df_with_rank.drop('prev_completed_on')
df_with_rank = df_with_rank.join(dense_rank,["srl_no","dense_rank"],'left')
df_with_rank.show()
输出:
+-------+----------+-------------------+-------------------+-------------------+
| srl_no|dense_rank| created_on| completed_on| prev_completed_on|
+-------+----------+-------------------+-------------------+-------------------+
|XXXXXX1| 1|2020-10-09 08:52:25|2020-10-09T08:57:45| null|
|XXXXXX1| 2|2020-10-09 09:04:32|2020-10-09T09:06:37|2020-10-09 08:57:45|
|XXXXXX1| 3|2020-10-09 09:10:10|2020-10-09T09:12:17|2020-10-09 09:06:37|
|XXXXXX1| 3|2020-10-09 09:10:10|2020-10-09T09:12:17|2020-10-09 09:06:37|
+-------+----------+-------------------+-------------------+-------------------+