问题描述
我的应用程序从最大为1mb的Azure EventHubs中获取数据,将其转换为DataTable,然后将其另存为Parquet文件。
parquet.net生成的实木复合地板非常巨大,即使采用最佳压缩方法,也始终超过50mb。当我使用熊猫读取这个50mb的实木复合地板文件,然后将其重新写入另一个文件时,它变得小于500kb。
请参阅下面的parquet.net(RED)和Pyarrow(BLUE)之间的比较:
我们可以看到,列和行的数量是相同的。 我确实检查了内容,似乎还可以。 Obs:有一个varchar(8000)列包含大量数据。
这就是我获取实木复合地板元数据的方式:
import pandas as pd
import pyarrow.parquet as pq
# pd.set_option("max_colwidth",None)
# pd.set_option("max_seq_item",None)
# pd.set_option("min_rows",2)
# pd.set_option("max_rows",None)
# pd.set_option('max_columns',None)
parquet_file_net = pq.ParquetFile("parquetnetFile.parquet")
print(parquet_file_net.Metadata)
print()
parquet_file_py = pq.ParquetFile("pyarrowFile.parquet")
print(parquet_file_py.Metadata)
print()
print()
print(parquet_file_net.Metadata.row_group(0))
print()
print(parquet_file_py.Metadata.row_group(0))
我的c#代码基于以下代码,但是我做了一些更改: https://github.com/dazfuller/datatable-to-parquet
这是我的C#代码。
public static async Task<MemoryStream> ToParquetStream(DataTable dt)
{
var fields = GenerateSchema(dt);
var parquetStream = new MemoryStream();
using (var writer = new ParquetWriter(new Schema(fields),parquetStream))
{
writer.CompressionMethod = CompressionMethod.Gzip;
writer.CompressionLevel = 2;
var range = Enumerable.Range(0,dt.Columns.Count);
var result = await range.ForEachAsyncInParallel(async c =>
{
return await Task.Run(() =>
{
// Determine the target data type for the column
var targettype = dt.Columns[c].DataType;
if (targettype == typeof(DateTime))
{
targettype = typeof(DateTimeOffset);
}
// Generate the value type,this is to ensure it can handle null values
var valueType = targettype.IsClass ? targettype : typeof(Nullable<>).MakeGenericType(targettype);
// Create a list to hold values of the required type for the column
var valuesArray = Array.CreateInstance(valueType,dt.Rows.Count);
// Get the data to be written to the parquet stream
for (int r = 0; r < dt.Rows.Count; r++)
{
DaTarow row = dt.Rows[r];
// Check if value is null,if so then add a null value
if (row[c] == null || row[c] == dbnull.Value)
{
valuesArray.SetValue(null,r);
}
else
{
// Add the value to the list,but if it's a DateTime then create it as a DateTimeOffset first
if (dt.Columns[c].DataType == typeof(DateTime))
{
valuesArray.SetValue(new DateTimeOffset((DateTime)row[c]),r);
}
else
{
valuesArray.SetValue(row[c],r);
}
}
}
return valuesArray;
});
});
using (var rgw = writer.CreateRowGroup())
{
for (int c = 0; c < dt.Columns.Count; c++)
{
rgw.WriteColumn(new Parquet.Data.DataColumn(fields[c],result[c]));
}
}
}
return parquetStream;
}
private static List<datafield> GenerateSchema(DataTable dt)
{
var fields = new List<datafield>(dt.Columns.Count);
foreach (DataColumn column in dt.Columns)
{
// Attempt to parse the type of column to a parquet data type
var success = Enum.TryParse<DataType>(column.DataType.Name,true,out var type);
// If the parse was not successful and it's source is a DateTime then use a DateTimeOffset,otherwise default to a string
if (!success && column.DataType == typeof(DateTime))
{
type = DataType.DateTimeOffset;
}
// In c# float is System.Single. That is why the parse fails
else if (!success && column.DataType == typeof(float))
{
type = DataType.Float;
}
else if (!success)
{
type = DataType.String;
}
fields.Add(new datafield(column.ColumnName,type));
}
return fields;
}
public static async Task<R[]> ForEachAsyncInParallel<T,R>(this IEnumerable<T> list,Func<T,Task<R>> func)
{
var tasks = new List<Task<R>>();
foreach (var value in list)
{
tasks.Add(func(value));
}
return await Task.WhenAll<R>(tasks);
}
那文件为什么这么大?
以下是parquet.net和pyarrow生成的文件:https://easyupload.io/m/28jo48
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)