问题描述
情况:每个月都有一个 csv 进入 AWS S3。供应商可以根据需要从文件中添加/删除/修改列。因此,架构无法提前知道。要求是在 SNowflake 中即时创建一个表并将数据加载到该表中。 Matillion 是我们的 ELT 工具。
这是我目前所做的。
- 设置 Lambda 以检测文件的到达,将其转换为 JSON,上传到另一个 S3 目录并将文件名添加到 SQS。
- Matillion 检测 SQS 消息并将带有 JSON 数据的文件加载到 SF 表的 Variant 列中。
- SF 存储过程采用变体列并根据 JSON 数据中的字段数生成一个表。 SF 中的 VARIANT 列仅在其 JSON 数据时以这种方式工作。很遗憾,不支持 CSV。
这适用于 10,000 行。当我使用超过 1GB(超过 10M 行)的完整文件运行此文件时,问题就出现了。它在运行时因磁盘空间不足错误导致 lambda 作业崩溃。
这些是我目前想到的替代方案:
- 将 EFS 卷附加到 lambda 并在上传到 S3 之前使用它来存储 JSON 文件。 JSON 数据文件比对应的 CSV 文件大得多,我预计 json 文件大约为 10-20GB,因为该文件有超过 1000 万行。
- Matillion 有一个 Excel 查询组件,它可以在其中获取标题并动态创建表格并加载文件。我想我可以将标题行从 CSV 转换为 Lambda 中的 XLX 文件,将其传递给 Matillion,让它创建结构,然后在创建结构后加载 csv 文件。
我还有哪些其他选择?考虑因素包括用于未来大型 CSV 或类似需求的良好可重复设计模式、EFS 成本、我是否充分利用了我可用的工具?谢谢!!!
解决方法
为什么不将初始 csv 文件拆分为多个文件,然后按照您当前的方式处理每个文件?
,你为什么要把 CSV 转换成 JSON; CSV 直接加载到表中,无需进行任何 JSON 特殊要求的数据转换,横向展平将 json 转换为关系数据行;以及为什么不使用 Snowflake Snowpipe 功能将数据直接加载到 Snowflake 而不使用 Matallion。您可以在加载到 Snowflake 之前将大型 csv 文件拆分为更小的块;这将有助于在 SF 仓库之间分配数据处理负载。
,我还使用 Matillion 将 CSV 文件从 SFTP 加载到 Snowflake,但不知道架构。
在我的过程中,我在 Snowflake 中创建了一个“临时”表,其中包含 50 个 VARCHAR 列(我们的文件不应超过 50 列)。我们的数据总是包含文本、日期或数字,所以 VARCHAR 不是问题。然后我可以将 .csv 文件加载到临时表中。我相信这也适用于来自 S3 的文件。
这至少会使数据进入 Snowflake。但是,鉴于您的情况,我不确定如何创建“最终”表。 我可以想象能够使用标题行和/或对每列中包含的数据“类型”进行一些分析,以确定所需的列类型。
但是如果您可以创建“最终”表,则可以将数据从临时表移过来。或者更改临时表本身。
,这可以使用外部表来实现,其中外部表将映射到单个列,分隔符将是一个换行符。外部表还有一个特殊的虚拟列,可以对其进行处理以动态提取所有列,然后使用存储过程在任何给定时间根据列数创建一个表。有一个有趣的视频讨论了雪花 (https://youtu.be/hRNu58E6Kmg) 中的这种限制