Apache Beam KinesisIO Java处理管道-应用程序状态,错误处理和容错?

问题描述

我正在处理我的第一个Apache Beam管道,以处理来自AWS Kinesis的数据流。我熟悉Kafka的概念,该概念如何处理消费者的偏移量/状态,并具有实施Apache风暴/火花处理的经验。

在阅读完文档后,我成功地使用KinesisIO Java SDK创建了一个工作束管道,以侦听AWS Kinesis数据流以转换和打印消息。但是,想知道有关如何在apache beam w.r.t中处理以下区域的任何参考实现或指针。 KinesisIO-

  1. 如何在Kinesis流中唯一标识消费者应用程序(类似于Kafka中的消费者组ID)-我是说对吗,它基于apache Beam的应用程序名称,任何使用KCL的消费者都跟踪其状态DynamoDB;总是&apache Beam KinesisIO真的是真的吗?

  2. 如何强制使用者开始处理数据流它的分片从更早的地方断开,即在使用者重新启动或处理中出现任何错误异常的情况下(类似于抵消管理,其中没有Kakfa中的每个使用者groupId)。即使我在处理了Kinesis流中的少量数据后重新启动管道,InitialPositionInStream.TRIM_HORIZON总是从最早的可用数据流开始。

  3. ack在Kinesis数据流中如何工作,即,消费者如何在进一步增加分片中的序列/位置之前,处理/更新使用getRecords()提取的数据流的检查点?有什么方法可以控制消费者应用程序中的这些行为,以确保何时重新启动消费者时何时成功确认消息以保存应用程序状态并从这些位置开始?

  4. 在处理来自Kinesis流的后续数据的后续数据流时(即应用程序是继续提取数据还是中止流程),业务异常的影响(在管道的任何阶段)。

解决方法

  1. py.exe c:\users\danh\mypythonscripts\hello.py利用后台的AWS开发工具包从Kinesis读取数据,并定期检索Shard Iterator的更新以从Kinesis碎片中获取记录。

  2. 您为此尝试过KinesisIO.Read吗?

  3. 在这里查看我的答案:https://stackoverflow.com/a/62349838/10687325

  4. 如果它是未知异常,则管道将被停止。