如何使用 Flink 作为 Case Class (scala) 读取镶木地板文件?

问题描述

借助 spark,我们可以轻松读取 parquet 文件并将其用作具有以下代码的 Case Class:

spark.read.parquet("my_parquet_table").as[MyCaseClass]

使用 Flink,我很难做到这一点。我的案例类来自一个 Avro 模式,所以它是一个 SpecificRecord。

我尝试了以下方法

val parquetInputFormat = new ParquetRowInputFormat(new Path(path),messageType)
env.readFile(parquetInputFormat,path)

这里的问题是 messageType,我无法将案例类和 avro 架构转换为有效的 messageType。 我试过这个:

val messageType = ParquetSchemaConverter.toParquetType(Type@R_770_404[email protected](classOf[MyCaseClass],true)

以以下错误结尾: class org.apache.flink.formats.avro.typeutils.AvroTypeInfo cannot be cast to class org.apache.flink.api.java.typeutils.RowTypeInfo

我可以尝试使用 table-api,但这意味着必须自己创建所有表模式,并且维护起来会很痛苦。 如果有人可以向我指出实施示例,或提出任何可能有帮助的建议,我们将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)