无法使用Flink Processor API恢复检查点状态

问题描述

主程序正在使用kafka事件,然后过滤->映射-> keyBy-> CEP->接收器。我编写了另一个单独的简单程序来读取检查点目录,如下所示:

object StateReader extends App {

  val path = "file://...."

  val env = ExecutionEnvironment.getExecutionEnvironment

  val chk = Savepoint.load(env.getJavaEnv,path,new FsstateBackend(path))

  val ds = chk.readKeyedState("cep",new CepOperatorReadFunction,Typeinformation.of(classOf[KEY]),Typeinformation.of(classOf[VALUE]))
  println(ds.count())

}

class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY,VALUE] {
  override def open(parameters: Configuration): Unit = {

  }
  override def readKey(k: KEY,context: KeyedStateReaderFunction.Context,collector: Collector[VALUE]): Unit = {

  }//end readKey
}//end class CepOperatorReadFunction

但是我遇到以下异常:

Caused by: java.lang.IllegalStateException: Unexpected state handle type,expected: class org.apache.flink.runtime.state.KeyGroupsstateHandle,but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
    ... 13 more

flink-conf.yaml中有一些配置

state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0

任何想法为何会发生异常以及如何解决问题?

谢谢

解决方法

没有开箱即用的支持,可以轻松读取CEP操作员的状态。因此,要实现您的KeyedStateReaderFunction,您必须深入研究CEP实现,找到使用的ValueStateMapState,并实现使用相同状态描述符的阅读器。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...