如何基于另一列的值填充Spark DataFrame列?

问题描述

我有一个用例,需要从dataframe中选择至少包含30列和数百万行的列。

我正在使用cassandrascalaapache-spark表中加载此数据。

我使用df.select("col1","col2","col3","col4")

选择了必填列

现在,我必须执行基本的groupBy操作,才能根据src_ipsrc_portdst_ipdst_port对数据进行分组,而且我还想具有原始received_time的{​​{1}}列中的最新值。

Original DataFrame

我想要一个dataframe dataframe值的distinct及其值src_ip和最新的count在新列中作为received_time

我知道如何使用last_seen,而且我认为可以在这里使用.withColumn。 由于我在这个领域还比较陌生,所以我真的不知道该如何进一步。我真的可以使用您的帮助来完成此任务。

解决方法

假设您的数据帧df为src_ip,src_port,dst_ip,dst_port and received_time,则可以尝试:

val mydf = df.groupBy(col("src_ip"),col("src_port"),col("dst_ip"),col("dst_port")).agg(count("received_time").as("row_count"),max(col("received_time")).as("max_received_time"))

上一行计算按列分组接收的时间戳计数以及该列分组的最大时间戳。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...