使用 Dataflow 将文件写入 GCS

问题描述

正在通过读取批量数据对数据流进行预处理。

从 Google Cloud Storage (GCS) 读取工作负载以处理 Dataflow 并将其上传回 GCS。

但是在处理完数据之后,我检查了 GCS。

result-001.csv

result-002.csv

result-003.csv

这就是数据的划分和存储方式。 我不能将这些文件合二为一吗?

#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def preprocessing(fields):
    fields = fields.split(",")
    header = "label"
    for i in range(0,784):
        header += (",pixel" + str(i))
    label_list_str = "["
    label_list = []
    for i in range(0,10) :
        if fields[0] == str(i) :
            label_list_str+=str(i)
        else :
            label_list_str+=("0")
        if i!=9 :
            label_list_str+=","
    label_list_str+="],"
    for i in range(1,len(fields)) :
        label_list_str+=fields[i]
        if i!=len(fields)-1:
            label_list_str+=","
    yield label_list_str


def run(project,bucket,dataset) :
        argv = [
            "--project={0}".format(project),"--job_name=kaggle-dataflow","--save_main_session","--region=asia-northeast1","--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),"--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),"--max_num_workers=8","--worker_region=asia-northeast3","--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd","--autoscaling_algorithm=THROUGHPUT_BASED","--runner=DataflowRunner","--worker_region=asia-northeast3"
        ]

        result_output = 'gs://kaggle-bucket-v1/result/result.csv'
        filename = "gs://{}/train.csv".format(bucket)
        pipeline = beam.Pipeline(argv=argv)
        ptransform = (pipeline
                      | "Read from GCS" >> beam.io.ReadFromText(filename)
                      | "kaggle data pre-processing" >> beam.FlatMap(preprocessing)
                      )
   
  
    (ptransform
    | "events:out" >> beam.io.WritetoText(
            result_output
        )
     )
    
    pipeline.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
    parser.add_argument("--project",dest="project",help="Unique project ID",required=True)
    parser.add_argument("--bucket",dest="bucket",help="Bucket where your data were ingested",required=True)
    parser.add_argument("--dataset",dest="dataset",help="BigQuery dataset")

    args = vars(parser.parse_args())

    print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))

    run(project=args["project"],bucket=args["bucket"],dataset=args["dataset"])

感谢阅读:)

解决方法

方法 beam.io.WriteToText 在写入时自动拆分文件以获得最佳性能。如果您只需要 1 个文件,您可以明确添加参数 num_shards = 1

num_shards (int) – 用于输出的文件(分片)数量。如果 未设置,服务将决定最佳分片数。 限制分片数量可能会降低性能 的一个管道。不建议设置此值,除非您 需要特定数量的输出文件。

您对文本的书写应如下所示:

(ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output,num_shards=1
        )
     )