问题描述
我正在处理一个lambda,它将将存储在Bucket-A(源)中的CSV文件转换为NDJSON并将其移至Bucket-B(目的地)
下面的逻辑可以很好地用于小型文件,但是我的CSV文件应该超过200 MB,其中一些约为2.5GB,即使lambda设置为最大超时,该逻辑也会超时。
我正在看一则帖子,当时在谈论使用lambda tmp空间直接将信息写入/附加到文件中,该文件可以上传到S3,但是tmp空间的最大大小约为500 MB
感谢您通读。
对于解决此问题的任何帮助,我们深表感谢。
import boto3
import ndjson
import csv
from datetime import datetime,timezone
from io import StringIO
import os
def lambda_handler(event,context):
errMsg = None
target_resp_list = []
l_utcdatetime = datetime.utcNow()
l_timestamp = l_utcdatetime.strftime('%Y%m%d%H%M%s')
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb',region_name=os.environ['AWS_REGION'])
for record in event["Records"]:
# Source bucket and key of the new file landed
source_bucket = record["s3"]["bucket"]["name"]
source_key = record["s3"]["object"]["key"]
source_file_name = source_key.split("/")[-1].split(".")[0]
bucket = s3.Bucket(source_bucket)
obj = bucket.Object(key=source_key)
response = obj.get()
records = StringIO(response['Body'].read().decode())
# loop through the csv records and add it to the response list,while adding the snapshot_datetime to each record
for row in csv.DictReader(records):
row['source_snapshot_datetime'] = f'{l_utcdatetime}'
target_resp_list.append(row)
# The below attributes are used in copying the ndjson file to the destination bucket
l_target_bucket = os.getenv("TargetBucket")
l_target_prefix = os.getenv("TargetPrefix")
l_target_key = f"{l_target_prefix}/{source_file_name}_{l_timestamp}.ndjson"
# Moving the ndjson file to SNowflake staging bucket
try:
s3_client.put_object(Body=ndjson.dumps(target_resp_list),Bucket=l_target_bucket,Key=l_target_key
)
print("File moved to destination bucket.")
except Exception as ex1:
errMsg = f"Error while copying the file from source to destination bucket - {ex1}"
# Raise exception in case of copy fail
if errMsg is not None:
raise Exception(errMsg)
解决方法
Lambda每次执行最多可以运行15分钟。 我建议首先检查在本地首先处理文件的最坏情况是什么。如果您期望大型文件,请尝试将lambda内存增加到满足您要求的最大可行值。
提示:
- 尝试压缩文件,将压缩了GB的CSV文件减小到兆字节,可能会压缩很多文本。
- 尝试提前拆分工作,如果可以将一个大文件拆分为一个lambda,然后再由其他人处理,则您不会在意执行timo-out。
将此内容留给以后可能会找的人使用。
我认为问题在于ndjson.dumps花费大量时间来转换列表并将其推送到S3,因此我所做的是使用一个计数器对源记录进行分块(每个记录50K),然后调用dumpChunkToS3(),这基本上是逻辑转储到S3。
需要额外的条件语句,因为行数/记录数不会被50K除以(在我的情况下是租约)
# loop through the csv records and add it to the response list,while adding the snapshot_datetime to the record
for row in csv.DictReader(records):
row['source_snapshot_datetime'] = f'{l_utcdatetime}'
rowscnt += 1
target_resp_list.append(row)
if rowscnt == 50000:
chunk_id += 1
respMsg = dumpChunkToS3(s3_client,source_file_name,target_resp_list,chunk_id)
rowscnt = 0
del target_resp_list[:]
if rowscnt > 0:
chunk_id += 1
respMsg = dumpChunkToS3(s3_client,chunk_id)