debezium 如何手动提交偏移量

问题描述

我使用 debezium 将数据从 Postgres 同步到 flink,并使用此代码创建引擎

this.engine = DebeziumEngine.create(Connect.class)
            .using(properties)
            .notifying(debeziumConsumer)
            .using((success,message,error) -> {
                if (!success && error != null) {
                    this.reportError(error);
                }
            })
            .build();

我想在 flink 执行检查点时调用 ChangeEventSourceCoordinator#commitOffset,但是 coordinatorBaseSourceTask 中是私有的,而 taskEmbeddedEngine 中是私有的,所以我可以' t 在我的代码调用 commitOffset,还有其他方法可以实现手动提交吗?

public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
       private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
       private ChangeEventSourceCoordinator coordinator;
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)