问题描述
借助 spark,我们可以轻松读取 parquet 文件并将其用作具有以下代码的 Case Class:
spark.read.parquet("my_parquet_table").as[MyCaseClass]
使用 Flink,我很难做到这一点。我的案例类来自一个 Avro 模式,所以它是一个 SpecificRecord。
我尝试了以下方法:
val parquetInputFormat = new ParquetRowInputFormat(new Path(path),messageType)
env.readFile(parquetInputFormat,path)
这里的问题是 messageType,我无法将案例类和 avro 架构转换为有效的 messageType。 我试过这个:
val messageType = ParquetSchemaConverter.toParquetType(Type@R_770_404[email protected](classOf[MyCaseClass],true)
以以下错误结尾:
class org.apache.flink.formats.avro.typeutils.AvroTypeInfo cannot be cast to class org.apache.flink.api.java.typeutils.RowTypeInfo
我可以尝试使用 table-api,但这意味着必须自己创建所有表模式,并且维护起来会很痛苦。 如果有人可以向我指出实施示例,或提出任何可能有帮助的建议,我们将不胜感激。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)