如何使用Apache Beam Python将输出写入动态路径

问题描述

我对Apache Beam很陌生。我的情况如下所示,

我有多个json格式的事件。在每个事件中,event_time列指示事件的创建时间,而我正在使用event_time计算其创建日期。 我想将此事件分别写在其日期分区下。我的代码就像

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time


class EventFormatter(beam.DoFn):

  def process(self,element,*args,**kwargs):
    tmp_dict = {}
    for i in range(len(element['properties'])):
        tmp_dict['messageid'] = element['messageid']
        tmp_dict['userid'] = element['userid']
        tmp_dict['event_time'] = element['event_time']
        tmp_dict['productid'] = element['properties'][i]['productid']

        yield tmp_dict


class DateParser(beam.DoFn):

    def process(self,**kwargs):
        key = time.strftime('%Y-%m-%d',time.localtime(element.get('event_time')))
        print(key,element)
        yield TaggedOutput(time.strftime('%Y-%m-%d',time.localtime(element.get('event_time'))),element)


with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          {"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1","userid": "user-78","event_time": 1598516997,"properties": [{"productid": "product-173"}]},{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686","userid": "user-74","event_time": 1598346837,"properties": [{"productid": "product-143"},{"productid": "product-144"}]}
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )


  output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date={}/'.format(....))

我找不到如何完成格式块的方法。当我运行代码以打印结果时,它给出了

('2020-08-27',{'productid': 'product-173','userid': 'user-78','event_time': 1598516997,'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1'})
('2020-08-25',{'productid': 'product-143','userid': 'user-74','event_time': 1598346837,'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
('2020-08-25',{'productid': 'product-144','messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})

为例。我想在date = 2020-08-25文件夹下写2个事件,另外一个date = 2020-08-27下写。

最后,我想将每个事件写在其创建日期文件夹下。

我该怎么做?

感谢您的帮助,

Oguz。

解决方法

在您的代码中,您正在使用多个输出。 这是为了将DoFn(一个ParDo)的输出连接到另一个DoFn,这对于整个管道来说是静态

如果要根据所拥有的内容将数据转储到其他文件中,则必须实现一个DoFn以便进行写入。

类似这样的东西:

class WriteByKey(apache_beam.DoFn):
   def process(self,kv):
        key,value = kv
        with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension','a') as fp:
            fp.write(value)

     

您应该更改DataParser DoFn以产生一个元组(日期,值)而不是TaggedOut,并将管道更改为以下内容:

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          {"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1","userid": "user-78","event_time": 1598516997,"properties": [{"productid": "product-173"}]},{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686","userid": "user-74","event_time": 1598346837,"properties": [{"productid": "product-143"},{"productid": "product-144"}]}
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser()) | beam.ParDo(WriteByKey())
  )

,

具体来说,要为每个键写几个元素,您可以做类似的事情

class WriteByKey(apache_beam.DoFn):
    def process(self,kvs):
         # All values with the same key will come in at once.
         key,values = kvs
         with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension','w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | ...
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )
  output = events | beam.GroupByKey() | beam.ParDo(WriteByKey())

请注意,跑步者可能需要在失败时重试元素,因此,与其直接写到输出中,更安全的方法是写一个临时文件,然后在成功时原子地重命名它,例如

class WriteByKey(apache_beam.DoFn):
    def process(self,values = kvs
         nonce = random.randint(1,1e9)
         path = f'gs://bucket/path/{key}.extension'
         temp_path = f'{path}-{nonce}'
         with beam.io.gcp.gcsio.GcsIO().open(temp_path,'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')
         beam.io.gcp.gcsio.GcsIO().rename(temp_path,path)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...