S3的Apache Flink状态读取文件

问题描述

我有一个flink批处理作业,该作业从S3中读取一个非常大的镶木文件,然后将json存储到Kafka主题中。

问题是如何使文件读取过程有状态?我的意思是,每当工作中断或被破坏时,工作应从先前的阅读状态开始吗?作业重新启动时,我不想将重复的项目发送给Kafka。

这是我的示例代码

val env = ExecutionEnvironment.getExecutionEnvironment
val input = Parquet.input[User](new Path(s"s3a://path"))
env.createInput(input)
  .filter(r => Option(r.token).getorElse("").nonEmpty)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)