在 GCP 中按行拆分大文件

问题描述

我想在 GCP 中拆分一些非常大的文件,然后再复制到 AWS 以供 lambda 处理。

文件可以高达 50GB,包含数百万行。我正在尝试将它们拆分为 10 万行,以便 lambda 进行处理。

据我所知,gsutils 中没有任何东西可以做到这一点。

我尝试将文件拆分器编写为 Cloud Function 并部署在 App Engine 中,但在测试中遇到了内存问题。我去了一个 F4 实例,但内存仍然不足。这是我只处理 500mb 文件时出现的错误:

Exceeded hard memory limit of 1024 MB with 1787 MB after servicing 0 requests total. Consider setting a larger instance class in app.yaml

这是部署到 App Engine 以进行文件拆分的代码:

@app.route('/')
def run():
    LOGGER.info(f"Request received with the following arguments: {request.args}")

    # Request args
    bucket_name = request.args.get('bucket_name')
    file_location = request.args.get('file_location')
    chunk_size = int(request.args.get('chunk_size',100000))

    LOGGER.info(f"Getting files in bucket: [{bucket_name}] with prefix: [{file_location}]")
    storage_client = storage.Client()

    for blob in storage_client.list_blobs(bucket_name,prefix=file_location):
        blob_name = str(blob.name)
        if "json" in blob_name:
            LOGGER.info(f"Found blob: [{blob_name}]")
            blob_split = blob_name.split("/")
            file_name = blob_split[-1]

            bucket = storage_client.get_bucket(bucket_name)
            LOGGER.info(f"Downloading file: [{file_name}]")
            download_blob = bucket.get_blob(blob_name)
            downloaded_blob_string = download_blob.download_as_string()
            downloaded_json_data = downloaded_blob_string.decode("utf-8").splitlines()
            LOGGER.info(f"Got blob: [{file_name}]")
            file_count = len(downloaded_json_data)
            LOGGER.info(f"Blob [{file_name}] has {file_count} rows")

            for file_number in range(0,file_count - 1,chunk_size):
                range_min = file_number
                range_max = file_number + chunk_size - 1
                if range_max > file_count:
                    range_max = file_count - 1
                LOGGER.info(f"Generating file for rows: {range_min+1} - {range_max+1}")
                split_file = "\n".join(downloaded_json_data[range_min:range_max+1]).encode("utf-8")
                LOGGER.info(f"Attempting upload of file for rows: {range_min+1} - {range_max+1}")
                upload_blob = bucket.blob(f"{file_location}split/{file_name}_split_{range_min+1}-{range_max+1}")
                upload_blob.upload_from_string(split_file)
                LOGGER.info(f"Upload complete for rows: {range_min+1} - {range_max+1}")
            LOGGER.info(f"Successfully split file: {file_name}")
    LOGGER.info(f"Completed all file splits for {file_location}")
    return "success"

有没有更有效的方法来做到这一点?我还有什么其他选择?

我想自动化这个文件拆分过程,我们必须在一个月内执行几次。我最好每次都启动一个 GCE 实例,这样我就可以运行以下命令:

 split -l 100000 file.json

然后在拆分完成后关闭它?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)