问题描述
当我尝试基于apap Beam管道将项目从python 3.7迁移到3.8时,类型提示检查在该位置开始失败:
pcoll = (
wrong_pcoll,some_pcoll_1,some_pcoll_2,some_pcoll_3,) | beam.Flatten(pipeline=pipeline)
pcoll | beam.CombinePerKey(MyCombineFn()) # << here
出现此错误:
apache_beam.typehints.decorators.TypeCheckerror: Input type hint violation at GroupByKey: expected Tuple[TypeVariable[K],TypeVariable[V]],got Union[TaggedOutput,Tuple[Any,Any],_MyType1],_MyType2]]
wrong_pcoll
实际上是一个TaggedOutput,因为它是从以前的ptransform上的一个标记输出接收的。
当作为TaggedOutput的wrong_pcoll
类型(属于pcoll
类型的一部分(与异常Union[TaggedOutput,_MyType2]]
对应)的一部分传递给GrouByKey时,类型提示检查失败在CombinePerKey内部使用。
所以我有两个问题:
- 为什么它在python 3.7中起作用而在3.8上不起作用?
- 如何为标记的输出指定类型?我尝试为PTransform的
process()
方法指定类型,该方法将它生成为它产生的所有输出类型的并集,但是由于某种原因,类型提示检查被选择为错误的类型。然后,我严格指定了所需的类型:Tuple[Any,Any]
,它已经起作用了。但这不是一种方法,因为process()
还会产生其他类型,例如简单的str
。
作为一种解决方法,我可以将此wrong_pcoll
通过带有beam.Map
和lambda x: x
的简单.with_output_types(Tuple[Any,Any])
进行传递,但这似乎不是解决它的明确方法
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)