问题描述
这是我的输入数据。
ㅡ.Input(本地)
'Iot,c c++ python,2015','Web,java spring,2016','Iot,c c++ spring,2017',
这是在本地环境中运行apache-beam的结果。
ㅡ.Outout(本地)
Iot,2015,c,1
Iot,c++,python,2017,2
Iot,spring,2
Web,2016,java,1
Web,1
但是,当我运行google-cloud-platform数据流并将其放在存储桶中时,结果会有所不同。
ㅡ。储存(桶)
Web,1
这是我的代码。
ㅡ。代码
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='project-id',runner='dataflow',temp_location='bucket-location'
)
def pardo_dofn_methods(test=None):
import apache_beam as beam
class split_category_advanced(beam.DoFn):
def __init__(self,delimiter=','):
self.delimiter = delimiter
self.k = 1
self.pre_processing = []
self.window = beam.window.GlobalWindow()
self.year_dict = {}
self.category_index = 0
self.language_index = 1
self.year_index = 2;
self.result = []
def setup(self):
print('setup')
def start_bundle(self):
print('start_bundle')
def finish_bundle(self):
print('finish_bundle')
for ppc_index in range(len(self.pre_processing)) :
if self.category_index == 0 or self.category_index%3 == 0 :
if self.pre_processing[self.category_index] not in self.year_dict :
self.year_dict[self.pre_processing[self.category_index]] = {}
if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
# { category : { year : {} } }
if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
# { category : { year : c : { },c++ : { },java : { }}}
language = self.pre_processing[self.year_index-1].split(' ')
for lang_index in range(len(language)) :
if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
else :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][
language[lang_index]] += 1
self.year_index = self.year_index + 3
self.category_index = self.category_index + 1
csvFormat = ''
for category,nested in self.year_dict.items() :
for year in nested :
for language in nested[year] :
csvFormat+= (category+","+str(year)+","+language+","+str(nested[year][language]))+"\n"
print(csvFormat)
yield beam.utils.windowed_value.WindowedValue(
value=csvFormat,#value = self.pre_processing,timestamp=0,windows=[self.window],)
def process(self,text):
for word in text.split(self.delimiter):
self.pre_processing.append(word)
print(self.pre_processing)
#with beam.Pipeline(options=pipeline_options) as pipeline:
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Gardening plants' >> beam.Create([
'Iot,])
| 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
| 'Save' >> beam.io.textio.WritetoText("bucket-location")
| beam.Map(print) \
)
if test:
return test(results)
if __name__ == '__main__':
pardo_dofn_methods_basic()
用于执行简单单词计数的代码。 CSV列具有[类别,年份,语言,数量] 例如IoT,2015,c,1
感谢您阅读。
解决方法
获得不同输出的最可能原因是并行性。使用DataflowRunner
时,操作将尽可能并行运行。由于您使用的是ParDo进行计数,因此当元素Iot,c c++ spring,2017
分配给两个不同的工作程序时,该计数不会如您所愿的那样发生(您在ParDo中进行计数)。
您需要使用组合器(4.2.4)
在这里,您有一个简单的示例来说明要做什么:
def generate_kvs(element,csv_delimiter=',',field_delimiter=' '):
splitted = element.split(csv_delimiter)
fields = splitted[1].split(field_delimiter)
# final key to count is (Source,year,language)
return [(f"{splitted[0]},{splitted[2]},{x}",1) for x in fields]
p = beam.Pipeline()
elements = ['Iot,c c++ python,2015','Web,java spring,2016','Iot,2017',2017']
(p | Create(elements)
| beam.ParDo(generate_kvs)
| beam.combiners.Count.PerKey()
| "Format" >> Map(lambda x: f"{x[0]},{x[1]}")
| Map(print))
p.run()
无论元素在工作人员中的分布如何,这都会输出所需的结果。
请注意,Apache Beam的想法是尽可能并行化,并且为了聚合,您需要组合器
我建议您检查一些wordcounts examples,以便掌握合并器的作用
编辑
合并器的说明:
ParDo是在元素间进行的操作。它使用一个元素,进行一些操作,然后将输出发送到下一个PTransform。当您需要汇总数据(计数元素,总和,连接语句...)时,按元素进行的操作不起作用,则需要采用PCollection的东西(即许多具有逻辑的元素)并输出某些东西。这是合并器进入的地方,它们以PCollection为基础执行操作,该操作可以跨工作程序进行(Map-Reduce操作的一部分)
在您的示例中,您使用的是Class参数将计数存储在ParDo中,因此,当元素通过它时,它将在类中更改该参数。当所有元素都经过同一个工作程序时,这将起作用,因为该类是在工作程序的基础上“创建”的(即,它们不共享状态),但是当有更多的工作程序时,(使用ParDo)计数就在分别在每个工人中发生