使用 .NET for Spark 在数据帧中的时间高效间隙填充数据

问题描述

我想使用 .NET for Spark 填补我的 DataFrame 中的空白。

当前的 DataFrame (rawData) 包含 reportFromreportTo间的分钟间隔数据

DateTime reportFrom = new DateTime(2021,3,4,0);
DateTime reportTo = new DateTime(2021,5,0);

缺少一些间隔,我想用最后一个已知值填充它们。

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
...
|2021|    3|  4|  22|     7|                87|               Power|               0.0|

第一步(插入丢失的分钟)后我期望的结果是:

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     6|              null|                null|              null|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
|2021|    3|  4|  14|     8|              null|                null|              null|
...
|2021|    3|  4|  23|    59|              null|                null|              null|               

到目前为止,我曾经使用所有分钟创建一个新的 DataFrame,然后在两个数据帧上执行 left outer Join

int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
    timeList.Add(reportFrom);
    reportFrom = reportFrom.AddMinutes(inc);
}    

var toFillTime0 = new List<object> { -1,0 };

var dataToFill = spark.CreateDataFrame(
    new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },new StructType(                     //shema
    new List<StructField>()
    {
            new StructField("Year0",new IntegerType()),new StructField("Month0",new StructField("Day0",new StructField("Hour0",new StructField("Minute0",}));

foreach (DateTime time in timeList)
{

    var toFillTime = new List<object> { time.Year,time.Month,time.Day,time.Hour,time.Minute };

    var dataToFillt = spark.CreateDataFrame(
        new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },new StructType(                     //shema
        new List<StructField>()
        {
            new StructField("Year0",}));

    dataToFill = dataToFill.Union(dataToFillt);

}

dataToFill = dataToFill.Filter("Year0 > 0");    

var toFillReportDataReq = dataToFill.Join(rawData,dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
                & dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"],"left_outer");    

几行 toFillReportDataReq 如下所示:

|2021|    3|  4|  22|     4|                87|               Power|               0.0|
|2021|    3|  4|  22|     5|                87|               Power|               0.0|
|2021|    3|  4|  22|     6|                87|               Power|               0.0|
|2021|    3|  4|  22|     7|                87|               Power|               0.0|
|2021|    3|  4|  22|     8|              null|                null|              null|
|2021|    3|  4|  22|     9|              null|                null|              null|
|2021|    3|  4|  22|    10|              null|                null|              null|
|2021|    3|  4|  22|    11|              null|                null|              null|
|2021|    3|  4|  22|    12|              null|                null|              null|
|2021|    3|  4|  22|    13|              null|                null|              null|
|2021|    3|  4|  22|    14|              null|                null|              null|

使用 Valueswindow 函数已经涵盖了 last 列中空值的替换。

使用

IdType列中的值替换为var id = 87和“Power”
toFillReportDataReq = toFillReportDataReq.WithColumn("Id",Functions.When(toFillReportDataReq["Id"].IsNull(),id)
   .Otherwise(toFillReportDataReq["Id"]))
   .WithColumn("Type",Functions.When(toFillReportDataReq["Type"].IsNull(),"Power")
    .Otherwise(toFillReportDataReq["Type"]));

这个方法返回了我想要的结果,但是非常耗时(效率低下)。

我的问题如下:

  • 是否有更充分的方法来创建包含指定间隔之间的所有分钟的新 DataFrame
  • 有什么办法可以避免在这方法中加入?
  • 将 Id 列中的所有值定义为 id 并将 Type 定义为“Power”的最佳方法是什么?

谢谢!

解决方法

这是我会采取的方法:

  1. 构建一个 DataFrame,每分钟都有一行显示(我使用 spark.Range 为我需要的每一分钟投影一行)
  2. 对于 Range 中的每个 ID,将开始日期增加一分钟
  3. 使用 left_outer join 将日期加入原始数据框,这样您就不会丢失任何行
  4. 然后使用 Last 来填补任何空白 - 请注意,如果您以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

namespace StackOverflow
{
    class Program
    {
        static void Main(string[] args)
        {
            var spark = SparkSession.Builder().GetOrCreate();
            
            //Sample data set - we will fill in the missing minutes
            var df = spark.CreateDataFrame(new List<GenericRow>()
            {
                new GenericRow(new object[] {2021,3,4,8,87,"Type1",380.5}),new GenericRow(new object[] {2021,10,null,null}),20,25,35,0.0}),45,0.0})
            },new StructType(new List<StructField>()
            {
                new StructField("Year",new IntegerType()),new StructField("Month",new StructField("Day",new StructField("Hour",new StructField("Minute",new StructField("ID",new StructField("Type",new StringType()),new StructField("Value",new DoubleType()),}));
            
            //start and end time
            var reportFrom = new DateTime(2021,7,0);
            var reportTo = new DateTime(2021,9,0);
            
            //convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
            var unixFromTime = (reportFrom - new DateTime(1970,1,0)).TotalSeconds;
            
            //how many total rows do we need?
            var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
            
            //create a dataframe with 1 row for every minute we need
            var everyMinute = spark.Range((long) minutesToCreate);
            
            //Add the reportFrom unix epoch
            everyMinute = everyMinute.WithColumn("BaseTime",Functions.Lit(unixFromTime));
            
            //add to the unix epoch,the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
            everyMinute = everyMinute.WithColumn("Time",Functions.Lit(unixFromTime)
                    .Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
            
            //convert the unix epoch to an actual timestamp and drop all the intermediate columns
            everyMinute = everyMinute.WithColumn("Date",Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
                
            //convert timestamp into individual columns

            everyMinute = everyMinute.WithColumn("Year",Functions.Year(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Month",Functions.Month(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Day",Functions.DayOfMonth(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Hour",Functions.Hour(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Minute",Functions.Minute(Functions.Col("Date")));

            //join both data frames so...
            var dfAllData = everyMinute.Join(df,new List<string>() {"Year","Month","Day","Hour","Minute"},"left_outer");
            
            //add in data using Last
            var window = Window.OrderBy("Year","Minute");
            var filledDataFrame = dfAllData.WithColumn("newValue",Functions.When(dfAllData["Value"].IsNull(),Functions.Last(dfAllData["Value"],true).Over(window))
                    .Otherwise(dfAllData["Value"]));

            filledDataFrame.Show(1000,10000);
        }
    }
}

编辑

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...