问题描述
我正在查看 .NET (C#) 中 Spark DataFrame
的窗口函数。
我有一个 DataFrame df
,其中包含 Year、Month、Day、Hour、Minute、ID、Type 和 Value:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | null | null | null |
| 2021 | 3 | 4 | 8 | 11 | null | null | null |
| 2021 | 3 | 4 | 8 | 12 | null | null | null |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
我想根据年、月、日、小时、分钟用前一行的值填充空行(空行),如下所示:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 11 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 12 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
到目前为止,我在 Scala 中找到了使用 Windows 和 Lag 函数的解决方案,但我不确定如何在 C# 中做到这一点。在 Scala 中,窗口将被定义为:
val window = Window.orderBy("Year","Month","Day","Hour","Minute")
我想使用
添加一个 newValue 列var filledDataFrame = df.WithColumn("newValue",Functions.When(df["Value"].IsNull(),Functions.Lag(df["Value"],1).Over(window)).Otherwise(df["Value"])
如何在 .NET 中为 Spark 定义一个窗口并使用 Lag 函数向前填充空值?
解决方法
要为 Apache Spark 使用 Lag 和带有 .NET 的窗口,您非常接近并且需要:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021,3,4,8,9,87,"Type1",380.5}),new GenericRow(new object[] {2021,10,null,null}),11,12,13,0.0}),14,0.0})
},new StructType(new List<StructField>()
{
new StructField("Year",new IntegerType()),new StructField("Month",new StructField("Day",new StructField("Hour",new StructField("Minute",new StructField("ID",new StructField("Type",new StringType()),new StructField("Value",new DoubleType()),}));
var window = Window.OrderBy("Year","Month","Day","Hour","Minute");
var filledDataFrame = df.WithColumn("newValue",Functions.When(df["Value"].IsNull(),Functions.Lag(df["Value"],1).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000,10000);
这将导致:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| null|
|2021| 3| 4| 8| 12|null| null| null| null|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
但您可能需要 Last
而不是 Lag
,因为您可以跳过空值:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021,Functions.Last(df["Value"],true).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000,10000);
结果:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| 380.5|
|2021| 3| 4| 8| 12|null| null| null| 380.5|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
希望能帮到你!
编辑
(使这项工作所需的 using 语句)
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;