问题描述
正在通过读取批量数据对数据流进行预处理。
从 Google Cloud Storage (GCS) 读取工作负载以处理 Dataflow 并将其上传回 GCS。
但是在处理完数据之后,我检查了 GCS。
result-001.csv
result-002.csv
result-003.csv
这就是数据的划分和存储方式。 我不能将这些文件合二为一吗?
#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def preprocessing(fields):
fields = fields.split(",")
header = "label"
for i in range(0,784):
header += (",pixel" + str(i))
label_list_str = "["
label_list = []
for i in range(0,10) :
if fields[0] == str(i) :
label_list_str+=str(i)
else :
label_list_str+=("0")
if i!=9 :
label_list_str+=","
label_list_str+="],"
for i in range(1,len(fields)) :
label_list_str+=fields[i]
if i!=len(fields)-1:
label_list_str+=","
yield label_list_str
def run(project,bucket,dataset) :
argv = [
"--project={0}".format(project),"--job_name=kaggle-dataflow","--save_main_session","--region=asia-northeast1","--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),"--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),"--max_num_workers=8","--worker_region=asia-northeast3","--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd","--autoscaling_algorithm=THROUGHPUT_BASED","--runner=DataflowRunner","--worker_region=asia-northeast3"
]
result_output = 'gs://kaggle-bucket-v1/result/result.csv'
filename = "gs://{}/train.csv".format(bucket)
pipeline = beam.Pipeline(argv=argv)
ptransform = (pipeline
| "Read from GCS" >> beam.io.ReadFromText(filename)
| "kaggle data pre-processing" >> beam.FlatMap(preprocessing)
)
(ptransform
| "events:out" >> beam.io.WritetoText(
result_output
)
)
pipeline.run()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
parser.add_argument("--project",dest="project",help="Unique project ID",required=True)
parser.add_argument("--bucket",dest="bucket",help="Bucket where your data were ingested",required=True)
parser.add_argument("--dataset",dest="dataset",help="BigQuery dataset")
args = vars(parser.parse_args())
print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))
run(project=args["project"],bucket=args["bucket"],dataset=args["dataset"])
感谢阅读:)
解决方法
方法 beam.io.WriteToText 在写入时自动拆分文件以获得最佳性能。如果您只需要 1 个文件,您可以明确添加参数 num_shards = 1
。
num_shards (int) – 用于输出的文件(分片)数量。如果 未设置,服务将决定最佳分片数。 限制分片数量可能会降低性能 的一个管道。不建议设置此值,除非您 需要特定数量的输出文件。
您对文本的书写应如下所示:
(ptransform
| "events:out" >> beam.io.WriteToText(
result_output,num_shards=1
)
)