与pyarrow相比,Parquet.NET正在生成巨大的Parquet文件

问题描述

我的应用程序从最大为1mb的Azure EventHubs中获取数据,将其转换为DataTable,然后将其另存为Parquet文件

parquet.net生成的实木复合地板非常巨大,即使采用最佳压缩方法,也始终超过50mb。当我使用熊猫读取这个50mb的实木复合地板文件,然后将其重新写入另一个文件时,它变得小于500kb。

请参阅下面的parquet.net(RED)和Pyarrow(BLUE)之间的比较:

我们可以看到,列和行的数量是相同的。 我确实检查了内容,似乎还可以。 Obs:有一个varchar(8000)列包含大量数据。

这就是我获取实木复合地板元数据的方式:

import pandas as pd
import pyarrow.parquet as pq

# pd.set_option("max_colwidth",None)
# pd.set_option("max_seq_item",None)
# pd.set_option("min_rows",2)
# pd.set_option("max_rows",None)
# pd.set_option('max_columns',None)

parquet_file_net = pq.ParquetFile("parquetnetFile.parquet")
print(parquet_file_net.Metadata)

print()

parquet_file_py = pq.ParquetFile("pyarrowFile.parquet")
print(parquet_file_py.Metadata)

print()
print()

print(parquet_file_net.Metadata.row_group(0))
print()
print(parquet_file_py.Metadata.row_group(0))

我的c#代码基于以下代码,但是我做了一些更改: https://github.com/dazfuller/datatable-to-parquet

这是我的C#代码

    public static async Task<MemoryStream> ToParquetStream(DataTable dt)
            {
                var fields = GenerateSchema(dt);
                var parquetStream = new MemoryStream();
    
                using (var writer = new ParquetWriter(new Schema(fields),parquetStream))
                {
                    writer.CompressionMethod = CompressionMethod.Gzip;
                    writer.CompressionLevel = 2;
    
                    var range = Enumerable.Range(0,dt.Columns.Count);
                    var result = await range.ForEachAsyncInParallel(async c =>
                    {
                        return await Task.Run(() =>
                        {
                            // Determine the target data type for the column
                            var targettype = dt.Columns[c].DataType;
                            if (targettype == typeof(DateTime))
                            {
                                targettype = typeof(DateTimeOffset);
                            }
    
                            // Generate the value type,this is to ensure it can handle null values
                            var valueType = targettype.IsClass ? targettype : typeof(Nullable<>).MakeGenericType(targettype);
    
                            // Create a list to hold values of the required type for the column
                            var valuesArray = Array.CreateInstance(valueType,dt.Rows.Count);
    
                            // Get the data to be written to the parquet stream
                            for (int r = 0; r < dt.Rows.Count; r++)
                            {
                                DaTarow row = dt.Rows[r];
    
                                // Check if value is null,if so then add a null value
                                if (row[c] == null || row[c] == dbnull.Value)
                                {
                                    valuesArray.SetValue(null,r);
                                }
                                else
                                {
                                    // Add the value to the list,but if it's a DateTime then create it as a DateTimeOffset first
                                    if (dt.Columns[c].DataType == typeof(DateTime))
                                    {
                                        valuesArray.SetValue(new DateTimeOffset((DateTime)row[c]),r);
                                    }
                                    else
                                    {
                                        valuesArray.SetValue(row[c],r);
                                    }
                                }
                            }
    
                            return valuesArray;
                        });
                    });
    
                    using (var rgw = writer.CreateRowGroup())
                    {
                        for (int c = 0; c < dt.Columns.Count; c++)
                        {                        
                            rgw.WriteColumn(new Parquet.Data.DataColumn(fields[c],result[c]));
                        }
                    }
                }
    
                return parquetStream;
            }
            private static List<datafield> GenerateSchema(DataTable dt)
            {
                var fields = new List<datafield>(dt.Columns.Count);
    
                foreach (DataColumn column in dt.Columns)
                {
                    // Attempt to parse the type of column to a parquet data type
                    var success = Enum.TryParse<DataType>(column.DataType.Name,true,out var type);
    
                    // If the parse was not successful and it's source is a DateTime then use a DateTimeOffset,otherwise default to a string
                    if (!success && column.DataType == typeof(DateTime))
                    {
                        type = DataType.DateTimeOffset;
                    }
                    // In c# float is System.Single. That is why the parse fails
                    else if (!success && column.DataType == typeof(float))
                    {
                        type = DataType.Float;
                    }
                    else if (!success)
                    {
                        type = DataType.String;
                    }
    
                    fields.Add(new datafield(column.ColumnName,type));
                }
    
                return fields;
            }


public static async Task<R[]> ForEachAsyncInParallel<T,R>(this IEnumerable<T> list,Func<T,Task<R>> func)
        {
            var tasks = new List<Task<R>>();
            foreach (var value in list)
            {
                tasks.Add(func(value));
            }
            return await Task.WhenAll<R>(tasks);
        }

文件为什么这么大?

以下是parquet.net和pyarrow生成文件https://easyupload.io/m/28jo48

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)