问题描述
我有两个列表a
和b
,它们之间有一些共同的元素,我想找到这些共同的元素及其数量,为此我编写了以下程序。
import functools
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.direct.direct_runner import DirectRunner
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions()
p = beam.Pipeline(InteractiveRunner(underlying_runner=DirectRunner()),options=options)
def form_pair(element,side_input):
for i,e in enumerate(side_input):
if e == element:
return i,e
a = ['a','b','c','b']
b = ['a','a','d','e','f']
x0 = p | "0" >> beam.Create(a) | "1" >> beam.distinct()
x1 = beam.pvalue.AsList(x0)
x3 = p | "2" >> beam.Create(b)
x4 = x3 | "3" >> beam.Map(functools.partial(form_pair,side_input=x1))
x5 = x4 | "4" >> beam.combiners.Count.PerKey()
r = p.run().wait_until_finish()
print(r.get(x5))
这给了我以下错误
TypeError: 'AsList' object is not iterable [while running '3']
解决方法
我将侧面输入错误地传递给beam.Map
函数,这是正确的方法
x4 = x3 | "3" >> beam.Map(form_pair,x1)
而不是错误的x4 = x3 | "3" >> beam.Map(functools.partial(form_pair,side_input=x1))
。