scala – 将新数据附加到分区的拼贴文件

我正在写一个ETL过程,我将需要读取小时日志文件,分割数据并保存.我正在使用Spark(在Databricks中).
日志文件是CSV,因此我读取它们并应用模式,然后执行我的转换.

我的问题是,如何保存每小时的数据作为镶木地板格式,但是附加到现有的数据集?保存时,我需要按数据帧中存在的4列进行分区.

这是我的保存线

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

问题是如果目标文件夹存在,则保存会抛出错误.
如果目的地不存在,那么我不会附加我的文件.

我尝试使用.mode(“append”),但是我发现Spark有时会在中途失败,所以我最终失去了我的数据写入量以及我还需要写多少.

我正在使用镶木地板,因为分区大大增加了我在未来的查询.同样,我必须将数据写入磁盘上的一些文件格式,并且不能使用诸如Druid或Cassandra之类的数据库.

任何关于如何分割数据框和保存文件的建议(坚持使用镶木地板或其他格式)都是非常感激的.

解决方法

如果你需要附加文件,你必须使用追加模式.我不知道你希望生成多少个分区,但是我发现如果你有很多分区,partitionBy会导致一些问题(内存和IO问题一样).

如果您认为您的问题是由于写入操作太长时间造成的,我建议您尝试以下两件事情:

1)通过添加配置来使用snappy:

conf.set("spark.sql.parquet.compression.codec","snappy")

2)禁止在SparkContext上的hadoopConfiguration中生成元数据文件,如下所示:

sc.hadoopConfiguration.set("parquet.enable.summary-Metadata","false")

元数据文件会产生一些耗时(见this blog post),但根据this,它们并不重要.就个人而言,我总是禁用它们,没有问题.

如果生成多个分区(> 500),我恐怕我能做的最好的是建议您查看一个不使用追加模式的解决方案 – 我根本没有设法使partitionBy与多个分区一起工作.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...