指定要通过GroupByKey传递的TaggedOutput的类型作为CombinePerKey的一部分

问题描述

当我尝试基于apap Beam管道将项目从python 3.7迁移到3.8时,类型提示检查在该位置开始失败:

pcoll = (
    wrong_pcoll,some_pcoll_1,some_pcoll_2,some_pcoll_3,) | beam.Flatten(pipeline=pipeline)
pcoll | beam.CombinePerKey(MyCombineFn())  # << here

出现此错误

apache_beam.typehints.decorators.TypeCheckerror: Input type hint violation at GroupByKey: expected Tuple[TypeVariable[K],TypeVariable[V]],got Union[TaggedOutput,Tuple[Any,Any],_MyType1],_MyType2]]

wrong_pcoll实际上是一个TaggedOutput,因为它是从以前的ptransform上的一个标记输出接收的。

当作为TaggedOutput的wrong_pcoll类型(属于pcoll类型的一部分(与异常Union[TaggedOutput,_MyType2]]对应)的一部分传递给GrouByKey时,类型提示检查失败在CombinePerKey内部使用。

所以我有两个问题:

  1. 为什么它在python 3.7中起作用而在3.8上不起作用?
  2. 如何为标记输出指定类型?我尝试为PTransform的process()方法指定类型,该方法将它生成为它产生的所有输出类型的并集,但是由于某种原因,类型提示检查被选择为错误的类型。然后,我严格指定了所需的类型:Tuple[Any,Any],它已经起作用了。但这不是一种方法,因为process()还会产生其他类型,例如简单的str

作为一种解决方法,我可以将此wrong_pcoll通过带有beam.Maplambda x: x的简单.with_output_types(Tuple[Any,Any])进行传递,但这似乎不是解决它的明确方法

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)