Flink AvroDeserializationSchema for SpecificRecord-与通用类类型一起使用

问题描述

目标: 我想实现一个功能,该功能可重复使用,以基于作为类型参数传递的生成的类为特定记录创建AvroDeserializationSchema。生成的类是使用AvroHugger生成的,并继承了SpecificRecordBase。

问题: 无法找到设置通用类型参数以使编译工作的方法。 AvroDeserializationSchema.forSpecific需要获取一个类作为生成的参数。

功能如下:

to new-position
 ifelse any? targets
    [ move-to one-of targets ]
    [ print "Not enough agents in the neighborhood" ]
end

用法

  ConfluentRegistryAvroDeserializationSchema[T] = {
    ConfluentRegistryAvroDeserializationSchema.forSpecific(
      classOf[T],schemaRegistryUrl
    )
  }

这是我在运行代码时遇到的错误

val registry = AvroSerde.createEventDeserializerForRegistry[SomeGeneratedClass]("localhost")

我知道classOf不适用于任意类型(并且T是任意类型),尽管我如何在此处传递Class以便重新使用此函数传递任意生成的类,我也不知道。

因此,如果我尝试传入T的运行时类,则它也不起作用:

class type required but T found classOf[T]

错误

ConfluentRegistryAvroDeserializationSchema.forSpecific(
      classtag[T].runtimeClass,schemaRegistryUrl
    )

如果有人对如何实现这种抽象有所了解,我将不胜感激。

谢谢。

解决方法

我不是世界上最大的scala人,但我假设您需要为类型参数指定下限(这实际上是scala问题)。

您可以在Java签名中看到T必须扩展SpecificRecord

    public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,String url)

所以您的函数还应该指定相同的下位类型

  ConfluentRegistryAvroDeserializationSchema[T >: SpecificRecord] = {
    ConfluentRegistryAvroDeserializationSchema.forSpecific(
      classOf[T],schemaRegistryUrl
    )
  }

您还可能想检查scala如何确定通用类型,例如here