问题描述
我想使用 .NET for Spark 填补我的 DataFrame
中的空白。
当前的 DataFrame (rawData)
包含 reportFrom
和 reportTo
之间的分钟间隔数据
DateTime reportFrom = new DateTime(2021,3,4,0);
DateTime reportTo = new DateTime(2021,5,0);
缺少一些间隔,我想用最后一个已知值填充它们。
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
...
|2021| 3| 4| 22| 7| 87| Power| 0.0|
第一步(插入丢失的分钟)后我期望的结果是:
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 6| null| null| null|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
|2021| 3| 4| 14| 8| null| null| null|
...
|2021| 3| 4| 23| 59| null| null| null|
到目前为止,我曾经使用所有分钟创建一个新的 DataFrame
,然后在两个数据帧上执行 left outer Join
。
int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
timeList.Add(reportFrom);
reportFrom = reportFrom.AddMinutes(inc);
}
var toFillTime0 = new List<object> { -1,0 };
var dataToFill = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },new StructType( //shema
new List<StructField>()
{
new StructField("Year0",new IntegerType()),new StructField("Month0",new StructField("Day0",new StructField("Hour0",new StructField("Minute0",}));
foreach (DateTime time in timeList)
{
var toFillTime = new List<object> { time.Year,time.Month,time.Day,time.Hour,time.Minute };
var dataToFillt = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },new StructType( //shema
new List<StructField>()
{
new StructField("Year0",}));
dataToFill = dataToFill.Union(dataToFillt);
}
dataToFill = dataToFill.Filter("Year0 > 0");
var toFillReportDataReq = dataToFill.Join(rawData,dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
& dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"],"left_outer");
几行 toFillReportDataReq
如下所示:
|2021| 3| 4| 22| 4| 87| Power| 0.0|
|2021| 3| 4| 22| 5| 87| Power| 0.0|
|2021| 3| 4| 22| 6| 87| Power| 0.0|
|2021| 3| 4| 22| 7| 87| Power| 0.0|
|2021| 3| 4| 22| 8| null| null| null|
|2021| 3| 4| 22| 9| null| null| null|
|2021| 3| 4| 22| 10| null| null| null|
|2021| 3| 4| 22| 11| null| null| null|
|2021| 3| 4| 22| 12| null| null| null|
|2021| 3| 4| 22| 13| null| null| null|
|2021| 3| 4| 22| 14| null| null| null|
使用 Values
和 window
函数已经涵盖了 last
列中空值的替换。
使用
将Id
和Type
列中的值替换为var id = 87
和“Power”
toFillReportDataReq = toFillReportDataReq.WithColumn("Id",Functions.When(toFillReportDataReq["Id"].IsNull(),id)
.Otherwise(toFillReportDataReq["Id"]))
.WithColumn("Type",Functions.When(toFillReportDataReq["Type"].IsNull(),"Power")
.Otherwise(toFillReportDataReq["Type"]));
这个方法返回了我想要的结果,但是非常耗时(效率低下)。
我的问题如下:
- 是否有更充分的方法来创建包含指定间隔之间的所有分钟的新
DataFrame
? - 有什么办法可以避免在这种方法中加入?
- 将 Id 列中的所有值定义为 id 并将 Type 定义为“Power”的最佳方法是什么?
谢谢!
解决方法
这是我会采取的方法:
- 构建一个 DataFrame,每分钟都有一行显示(我使用 spark.Range 为我需要的每一分钟投影一行)
- 对于 Range 中的每个 ID,将开始日期增加一分钟
- 使用 left_outer join 将日期加入原始数据框,这样您就不会丢失任何行
- 然后使用 Last 来填补任何空白 - 请注意,如果您以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
namespace StackOverflow
{
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();
//Sample data set - we will fill in the missing minutes
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021,3,4,8,87,"Type1",380.5}),new GenericRow(new object[] {2021,10,null,null}),20,25,35,0.0}),45,0.0})
},new StructType(new List<StructField>()
{
new StructField("Year",new IntegerType()),new StructField("Month",new StructField("Day",new StructField("Hour",new StructField("Minute",new StructField("ID",new StructField("Type",new StringType()),new StructField("Value",new DoubleType()),}));
//start and end time
var reportFrom = new DateTime(2021,7,0);
var reportTo = new DateTime(2021,9,0);
//convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
var unixFromTime = (reportFrom - new DateTime(1970,1,0)).TotalSeconds;
//how many total rows do we need?
var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
//create a dataframe with 1 row for every minute we need
var everyMinute = spark.Range((long) minutesToCreate);
//Add the reportFrom unix epoch
everyMinute = everyMinute.WithColumn("BaseTime",Functions.Lit(unixFromTime));
//add to the unix epoch,the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
everyMinute = everyMinute.WithColumn("Time",Functions.Lit(unixFromTime)
.Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
//convert the unix epoch to an actual timestamp and drop all the intermediate columns
everyMinute = everyMinute.WithColumn("Date",Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
//convert timestamp into individual columns
everyMinute = everyMinute.WithColumn("Year",Functions.Year(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Month",Functions.Month(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Day",Functions.DayOfMonth(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Hour",Functions.Hour(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Minute",Functions.Minute(Functions.Col("Date")));
//join both data frames so...
var dfAllData = everyMinute.Join(df,new List<string>() {"Year","Month","Day","Hour","Minute"},"left_outer");
//add in data using Last
var window = Window.OrderBy("Year","Minute");
var filledDataFrame = dfAllData.WithColumn("newValue",Functions.When(dfAllData["Value"].IsNull(),Functions.Last(dfAllData["Value"],true).Over(window))
.Otherwise(dfAllData["Value"]));
filledDataFrame.Show(1000,10000);
}
}
}
编辑