问题描述
我有一个flink批处理作业,该作业从S3中读取一个非常大的镶木文件,然后将json存储到Kafka主题中。
问题是如何使文件读取过程有状态?我的意思是,每当工作中断或被破坏时,工作应从先前的阅读状态开始吗?作业重新启动时,我不想将重复的项目发送给Kafka。
这是我的示例代码
val env = ExecutionEnvironment.getExecutionEnvironment
val input = Parquet.input[User](new Path(s"s3a://path"))
env.createInput(input)
.filter(r => Option(r.token).getorElse("").nonEmpty)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)