用apap Beam进行设定差

问题描述

我有两个列表ab,它们之间有一些共同的元素,我想找到这些共同的元素及其数量,为此我编写了以下程序。

import functools
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.direct.direct_runner import DirectRunner
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()

p = beam.Pipeline(InteractiveRunner(underlying_runner=DirectRunner()),options=options)

def form_pair(element,side_input):
  for i,e in enumerate(side_input):
    if e == element:
      return i,e


a = ['a','b','c','b']
b = ['a','a','d','e','f']

x0 = p | "0" >> beam.Create(a) | "1" >> beam.distinct()
x1 = beam.pvalue.AsList(x0)


x3 = p | "2" >> beam.Create(b)
x4 = x3 | "3" >> beam.Map(functools.partial(form_pair,side_input=x1))
x5 = x4 | "4" >> beam.combiners.Count.PerKey()


r = p.run().wait_until_finish()

print(r.get(x5))

这给了我以下错误

TypeError: 'AsList' object is not iterable [while running '3']

解决方法

我将侧面输入错误地传递给beam.Map函数,这是正确的方法

x4 = x3 | "3" >> beam.Map(form_pair,x1)而不是错误的x4 = x3 | "3" >> beam.Map(functools.partial(form_pair,side_input=x1))