如何限制使用FileIO写入的每个文件的行数

问题描述

是否可以使用TextIO限制每个书面分片中的行数,或者可以是FileIO?

示例:

  1. 从Big Query-Batch Job中读取行(例如,结果为19500行)。
  2. 进行一些转换。
  3. 文件写入Google Cloud存储(19个文件,每个文件限制为1000条记录,一个文件有500条记录)。
  4. 触发了Cloud Function,以针对GCS中的每个文件向外部API发出POST请求。

这是到目前为止我正在尝试做的,但是不起作用(试图限制每个文件1000行):

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query=query,use_standard_sql=True)) | beam.Map(json.dumps)

BQ_DATA | beam.WindowInto(GlobalWindows(),Repeatedly(trigger=AfterCount(1000)),accumulation_mode=AccumulationMode.disCARDING)
        | WritetoFiles(path='fileio',destination="csv")

我在概念上是错误的,还是有其他方法可以实现此目的?

解决方法

您可以在ParDo中实现对GCS的写入步骤,并限制要包含在“批处理”中的元素数量,如下所示:

from apache_beam.io import filesystems

class WriteToGcsWithRowLimit(beam.DoFn):
  def __init__(self,row_size=1000):
    self.row_size = row_size
    self.rows = []

  def finish_bundle(self):
     if len(self.rows) > 0:
        self._write_file()

  def process(self,element):
    self.rows.append(element)
    if len(self.rows) >= self.row_size:
        self._write_file()

  def _write_file(self):
    from time import time
    new_file = 'gs://bucket/file-{}.csv'.format(time())
    writer = filesystems.FileSystems.create(path=new_file)
    writer.write(self.rows) # may need to format
    self.rows = []
    writer.close()
BQ_DATA  | beam.ParDo(WriteToGcsWithRowLimit())

请注意,这不会创建少于1000行的任何文件,但是您可以更改process中的逻辑来做到这一点。

(编辑1以处理余数)

(编辑2以停止使用计数器,因为文件将被覆盖)