问题描述
我使用 debezium 将数据从 Postgres 同步到 flink,并使用此代码创建引擎
this.engine = DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(debeziumConsumer)
.using((success,message,error) -> {
if (!success && error != null) {
this.reportError(error);
}
})
.build();
我想在 flink 执行检查点时调用 ChangeEventSourceCoordinator#commitOffset
,但是 coordinator
在 BaseSourceTask
中是私有的,而 task
在 EmbeddedEngine
中是私有的,所以我可以' t 在我的代码中调用 commitOffset
,还有其他方法可以实现手动提交吗?
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
private ChangeEventSourceCoordinator coordinator;
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)