在本地环境中,结果值和数据流结果值不同

问题描述

这是我的输入数据。

ㅡ.Input(本地)

'Iot,c c++ python,2015','Web,java spring,2016','Iot,c c++ spring,2017',

这是在本地环境中运行apache-beam的结果。

ㅡ.Outout(本地)

Iot,2015,c,1
Iot,c++,python,2017,2
Iot,spring,2
Web,2016,java,1
Web,1

但是,当我运行google-cloud-platform数据流并将其放在存储桶中时,结果会有所不同。

ㅡ。储存(桶)

Web,1

这是我的代码

ㅡ。代码

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

pipeline_options = PipelineOptions(
    project='project-id',runner='dataflow',temp_location='bucket-location'
)
def pardo_dofn_methods(test=None):
    import apache_beam as beam


    class split_category_advanced(beam.DoFn):
      def __init__(self,delimiter=','):
        self.delimiter = delimiter
        self.k = 1
        self.pre_processing = []
        self.window = beam.window.GlobalWindow()
        self.year_dict = {}
        self.category_index = 0
        self.language_index = 1
        self.year_index = 2;
        self.result = []

      def setup(self):
          print('setup')

      def start_bundle(self):
          print('start_bundle')

      def finish_bundle(self):
          
          print('finish_bundle')
          for ppc_index in range(len(self.pre_processing)) :
              if self.category_index == 0 or self.category_index%3 == 0 :
                  if self.pre_processing[self.category_index] not in self.year_dict :
                          self.year_dict[self.pre_processing[self.category_index]] = {}

          
          if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
              # { category : { year : {} } }
              if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
                     self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
              # { category : { year : c : { },c++ : { },java : { }}}
              language = self.pre_processing[self.year_index-1].split(' ')

              for lang_index in range(len(language)) :
                    if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
                      self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
                    else :
                        self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][
                            language[lang_index]] += 1
              self.year_index = self.year_index + 3
          self.category_index = self.category_index + 1

      csvFormat = ''
      for category,nested in self.year_dict.items() :
          for year in nested :
              for language in nested[year] :
                csvFormat+= (category+","+str(year)+","+language+","+str(nested[year][language]))+"\n"
                print(csvFormat)

      yield beam.utils.windowed_value.WindowedValue(
          value=csvFormat,#value = self.pre_processing,timestamp=0,windows=[self.window],)

  def process(self,text):
    for word in text.split(self.delimiter):
        self.pre_processing.append(word)
    print(self.pre_processing)

#with beam.Pipeline(options=pipeline_options) as pipeline:
with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          'Iot,])
      | 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
      | 'Save' >> beam.io.textio.WritetoText("bucket-location")
      | beam.Map(print) \
  )
   if test:
       return test(results)

if __name__ == '__main__':
  pardo_dofn_methods_basic()

用于执行简单单词计数的代码。 CSV列具有[类别,年份,语言,数量] 例如IoT,2015,c,1

感谢您阅读。

解决方法

获得不同输出的最可能原因是并行性。使用DataflowRunner时,操作将尽可能并行运行。由于您使用的是ParDo进行计数,因此当元素Iot,c c++ spring,2017分配给两个不同的工作程序时,该计数不会如您所愿的那样发生(您在ParDo中进行计数)。

您需要使用组合器(4.2.4

在这里,您有一个简单的示例来说明要做什么:

    def generate_kvs(element,csv_delimiter=',',field_delimiter=' '):
        splitted = element.split(csv_delimiter)
        fields = splitted[1].split(field_delimiter)

        # final key to count is (Source,year,language)
        return [(f"{splitted[0]},{splitted[2]},{x}",1) for x in fields]


    p = beam.Pipeline()

    elements = ['Iot,c c++ python,2015','Web,java spring,2016','Iot,2017',2017']

    (p | Create(elements)
       | beam.ParDo(generate_kvs)
       | beam.combiners.Count.PerKey()
       | "Format" >> Map(lambda x: f"{x[0]},{x[1]}")
       | Map(print))

    p.run()

无论元素在工作人员中的分布如何,这都会输出所需的结果。

请注意,Apache Beam的想法是尽可能并行化,并且为了聚合,您需要组合器

我建议您检查一些wordcounts examples,以便掌握合并器的作用

编辑

合并器的说明:

ParDo是在元素间进行的操作。它使用一个元素,进行一些操作,然后将输出发送到下一个PTransform。当您需要汇总数据(计数元素,总和,连接语句...)时,按元素进行的操作不起作用,则需要采用PCollection的东西(即许多具有逻辑的元素)并输出某些东西。这是合并器进入的地方,它们以PCollection为基础执行操作,该操作可以跨工作程序进行(Map-Reduce操作的一部分)

在您的示例中,您使用的是Class参数将计数存储在ParDo中,因此,当元素通过它时,它将在类中更改该参数。当所有元素都经过同一个工作程序时,这将起作用,因为该类是在工作程序的基础上“创建”的(即,它们不共享状态),但是当有更多的工作程序时,(使用ParDo)计数就在分别在每个工人中发生