.NET for Spark 的正向填充

问题描述

我正在查看 .NET (C#) 中 Spark DataFrame 的窗口函数

我有一个 DataFrame df,其中包含 Year、Month、Day、Hour、Minute、ID、Type 和 Value:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  11 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  12 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

我想根据年、月、日、小时、分钟用前一行的值填充空行(空行),如下所示:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  11 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  12 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

到目前为止,我在 Scala 中找到了使用 Windows 和 Lag 函数解决方案,但我不确定如何在 C# 中做到这一点。在 Scala 中,窗口将被定义为:

val window = Window.orderBy("Year","Month","Day","Hour","Minute")

我想使用

添加一个 newValue 列

var filledDataFrame = df.WithColumn("newValue",Functions.When(df["Value"].IsNull(),Functions.Lag(df["Value"],1).Over(window)).Otherwise(df["Value"])

如何在 .NET 中为 Spark 定义一个窗口并使用 Lag 函数向前填充空值?

解决方法

要为 Apache Spark 使用 Lag 和带有 .NET 的窗口,您非常接近并且需要:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
    new GenericRow(new object[] {2021,3,4,8,9,87,"Type1",380.5}),new GenericRow(new object[] {2021,10,null,null}),11,12,13,0.0}),14,0.0})
},new StructType(new List<StructField>()
{
    new StructField("Year",new IntegerType()),new StructField("Month",new StructField("Day",new StructField("Hour",new StructField("Minute",new StructField("ID",new StructField("Type",new StringType()),new StructField("Value",new DoubleType()),}));

var window = Window.OrderBy("Year","Month","Day","Hour","Minute");
var filledDataFrame = df.WithColumn("newValue",Functions.When(df["Value"].IsNull(),Functions.Lag(df["Value"],1).Over(window))
        .Otherwise(df["Value"]));

filledDataFrame.Show(1000,10000);

这将导致:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|    null|
|2021|    3|  4|   8|    12|null| null| null|    null|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

但您可能需要 Last 而不是 Lag,因为您可以跳过空值:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
    new GenericRow(new object[] {2021,Functions.Last(df["Value"],true).Over(window))
        .Otherwise(df["Value"]));

filledDataFrame.Show(1000,10000);

结果:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|   380.5|
|2021|    3|  4|   8|    12|null| null| null|   380.5|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

希望能帮到你!

编辑

(使这项工作所需的 using 语句)

using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...