运行多个 CEP 模式规则

问题描述

我们在运行多个 CEP 模式时遇到错误。这是我们的 用例: 我们计划在 flink 之上构建一个基于规则的引擎 规则的数量并为此做一个 POC。对于 POC,我们有大约 1000 我们正在将基于模式的规则转换为 CEP 模式并运行 这些规则基于事件数据的键控流来检测模式。我们是 按 orgId 对流进行分区,并且每个规则都需要运行到每个规则中 组织。这是我们为实现该功能而编写的代码

DataStream<Event> partitionedInput =
    eventStream.keyBy((KeySelector<Event,String>) Event::getorgid);
List<Rule> ruleList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
  ruleList.add(new Rule("rule" + i,"process1","process2","process3"));
  ruleList.add(
      new Rule("rule" + (i + 500),"process4","process5","process6"));
}
for (Rule rule : ruleList) {
  String st = rule.getStart();
  String mi = rule.getMid();
  String en = rule.getEnd();
  String nm = rule.getName();
  Pattern<Event,?> pattern =
      Pattern.begin(
          Pattern.<Event>begin("start")
              .where(
                  new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event value) throws Exception {
                      return value.getProcess().equals(st);
                    }
                  })
              .followedBy("middle")
              .where(
                  new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                      return !event.getProcess().equals(mi);
                    }
                  })
              .optional()
              .followedBy("end")
              .where(
                  new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                      return event.getProcess().equals(en);
                    }
                  }));
  PatternStream<Event> patternStream = CEP.pattern(partitionedInput,pattern);
  DataStream<String> alerts =
      patternStream.process(
          new PatternProcessFunction<Event,String>() {
            @Override
            public void processMatch(
                Map<String,List&lt;Event>> map,Context context,Collector<String> collector)
                throws Exception {
              Event start = map.containsKey("start") ?
map.get("start").get(0) : null;
              Event middle = map.containsKey("middle") ?
map.get("middle").get(0) : null;
              Event end = map.containsKey("end") ? map.get("end").get(0) :
null;
              StringJoiner joiner = new StringJoiner(",");
              joiner
                  .add("Rule : " + nm + " ")
                  .add((start == null ? "" : start.getId()))
                  .add((middle == null ? "" : middle.getId()))
                  .add((end == null ? "" : end.getId()));
              collector.collect(joiner.toString());
            }
          });
  alerts.print();

我们尝试在带有 1 个任务管理器和 4 个任务管理器的 flink 集群上运行此代码 任务槽和任务管理器因错误而崩溃:

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
norestartBackoffTimeStrategy
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
        at
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
        at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
        at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
        at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
        at
org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)
        at
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcmessage(AkkaRpcActor.java:208)
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcmessage(FencedAkkaRpcActor.java:77)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.MailBox.processMailBox(MailBox.scala:258)
        at akka.dispatch.MailBox.run(MailBox.scala:225)
        at akka.dispatch.MailBox.exec(MailBox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.TimeoutException: Invocation of public abstract
java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
timed out.
        at
java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)
        at
java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
        at
java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
        at akka.dispatch.OnComplete.internal(Future.scala:263)
        at akka.dispatch.OnComplete.internal(Future.scala:261)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
        at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
        at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
        at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
        at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
        at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.TimeoutException: Invocation of public
abstract java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.api.common.time.Time)
timed out.
        at
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
        at
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)
        at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
Caused by: akka.pattern.AsktimeoutException: Ask timed out on
[Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]
after [10000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AsktimeoutException` is that the recipient actor didn't send a
reply.
        at
akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
        at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
        at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
        at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
        at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
        at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
        at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
        ... 1 more/

有人可以帮忙吗?为什么这段代码失败了?出路 可扩展或有没有更好的方法来做到这一点?考虑到每 CEP 操作员创建一个线程,这是否会在生产中使用这么多 每个任务槽的线程数? CEP 库是否支持组合多种模式 在单个操作符/线程中?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...