> Hi,
> We are running into errors when running multiple CEP patterns. Here’s our
> use-case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules and doing a POC for that. For POC we have around 1000
> pattern based rules which we are translating into CEP patterns and running
> these rules on a keyed stream of events data to detect patterns. We are
> partitioning the stream by orgId and each rule needs to be run into each
> org. Here’s the code we’ve written to implement that :
> /DataStream<Event> eventStream = null;
> DataStream<Event> partitionedInput =
> eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);
> List<Rule> ruleList = new ArrayList<>();
> for (int i = 0; i < 100; i++) {
> ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));
> ruleList.add(
> new Rule("rule" + (i + 500), "process4", "process5", "process6"));
> }
> for (Rule rule : ruleList) {
> String st = rule.getStart();
> String mi = rule.getMid();
> String en = rule.getEnd();
> String nm = rule.getName();
> Pattern<Event, ?> pattern =
> Pattern.begin(
> Pattern.<Event>begin("start")
> .where(
> new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event value) throws Exception {
> return value.getProcess().equals(st);
> }
> })
> .followedBy("middle")
> .where(
> new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event event) {
> return !event.getProcess().equals(mi);
> }
> })
> .optional()
> .followedBy("end")
> .where(
> new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event event) {
> return event.getProcess().equals(en);
> }
> }));
> PatternStream<Event> patternStream = CEP.pattern(partitionedInput,
> pattern);
> DataStream<String> alerts =
> patternStream.process(
> new PatternProcessFunction<Event, String>() {
> @Override
> public void processMatch(
> Map<String, List<Event>> map, Context context,
> Collector<String> collector)
> throws Exception {
> Event start = map.containsKey("start") ?
> map.get("start").get(0) : null;
> Event middle = map.containsKey("middle") ?
> map.get("middle").get(0) : null;
> Event end = map.containsKey("end") ? map.get("end").get(0) :
> null;
> StringJoiner joiner = new StringJoiner(",");
> joiner
> .add("Rule : " + nm + " ")
> .add((start == null ? "" : start.getId()))
> .add((middle == null ? "" : middle.getId()))
> .add((end == null ? "" : end.getId()));
> collector.collect(joiner.toString());
> }
> });
> alerts.print();/
> We tried to run this code on the flink cluster with 1 task manager with 4
> task slots and the task manager crashed with the error :
> /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
> at
> org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)
> at
> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException: Invocation of public abstract
> java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
> timed out.
> at
> java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)
> at
> java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
> at
> java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
> at akka.dispatch.OnComplete.internal(Future.scala:263)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
> at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
> at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.util.concurrent.TimeoutException: Invocation of public
> abstract java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
> timed out.
> at
> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
> at
> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]
> after [10000 ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at
> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
> at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
> at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
> ... 1 more/
>
>
> Can somebody help with this ? Why is this code failing ? Is out approach
> scalable or Is there any better way of doing this ? Considering that every
> CEP operator creates a thread, will this work in production with so many
> threads per task slot ? Does CEP library support combining multiple patterns
> in a single operator/thread ?
>
>
>
> --
> Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/