简单的Apache光束管道抛出TypeError:需要一个整数

问题描述

我有这个用python编写的简单apache-beam管道。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.direct.direct_runner import DirectRunner
from apache_beam.options.pipeline_options import PipelineOptions

p = beam.Pipeline(InteractiveRunner(underlying_runner=DirectRunner()),options=PipelineOptions())

class Foo(beam.DoFn):
  def process(self,element,*args,**kwargs):
    k,v = element
    yield (k,{k:v})


a = [('a',1),('a',('b',1)]

x0 = p | "0" >> beam.Create(a) | "2" >> beam.ParDo(Foo())
r = p.run()
r.wait_until_finish()
print(r.get(x0))

运行上面的代码会引发以下错误

Traceback (most recent call last):
  File "apache_beam/runners/common.py",line 883,in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py",line 497,in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py",line 1028,in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py",line 177,in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py",line 155,in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py",line 214,in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py",line 233,in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py",line 1162,in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py",line 1172,line 768,in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py",line 187,in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py",line 686,in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
  File "apache_beam/coders/stream.pyx",line 225,in apache_beam.coders.stream.get_varint_size
TypeError: an integer is required

但是,如果我们将a设置为列表列表而不是元组列表,则相同的代码将起作用。

a = [('a',1)]-> a = [['a',1],['a',['b',1]]

解决方法

我已经确认这是一个错误。提起https://issues.apache.org/jira/browse/BEAM-10833