Running multiple CEP pattern rules

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Running multiple CEP pattern rules

Tejas
Hi,
We are running into errors when running multiple CEP patterns. Here’s our
use-case :
We are planning to build a rule based engine on top of flink with huge
number of rules and doing a POC for that. For POC we have around 1000
pattern based rules which we are translating into CEP patterns and running
these rules on a keyed stream of events data to detect patterns. We are
partitioning the stream by orgId and each rule needs to be run into each
org. Here’s the code we’ve written to implement that :
/DataStream<Event> eventStream = null;
DataStream<Event> partitionedInput =
    eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);
List<Rule> ruleList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
  ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));
  ruleList.add(
      new Rule("rule" + (i + 500), "process4", "process5", "process6"));
}
for (Rule rule : ruleList) {
  String st = rule.getStart();
  String mi = rule.getMid();
  String en = rule.getEnd();
  String nm = rule.getName();
  Pattern<Event, ?> pattern =
      Pattern.begin(
          Pattern.<Event>begin("start")
              .where(
                  new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event value) throws Exception {
                      return value.getProcess().equals(st);
                    }
                  })
              .followedBy("middle")
              .where(
                  new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                      return !event.getProcess().equals(mi);
                    }
                  })
              .optional()
              .followedBy("end")
              .where(
                  new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                      return event.getProcess().equals(en);
                    }
                  }));
  PatternStream<Event> patternStream = CEP.pattern(partitionedInput,
pattern);
  DataStream<String> alerts =
      patternStream.process(
          new PatternProcessFunction<Event, String>() {
            @Override
            public void processMatch(
                Map<String, List&lt;Event>> map, Context context,
Collector<String> collector)
                throws Exception {
              Event start = map.containsKey("start") ?
map.get("start").get(0) : null;
              Event middle = map.containsKey("middle") ?
map.get("middle").get(0) : null;
              Event end = map.containsKey("end") ? map.get("end").get(0) :
null;
              StringJoiner joiner = new StringJoiner(",");
              joiner
                  .add("Rule : " + nm + " ")
                  .add((start == null ? "" : start.getId()))
                  .add((middle == null ? "" : middle.getId()))
                  .add((end == null ? "" : end.getId()));
              collector.collect(joiner.toString());
            }
          });
  alerts.print();/
We tried to run this code on the flink cluster with 1 task manager with 4
task slots and the task manager crashed with the error :
/Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
        at
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
        at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
        at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
        at
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
        at
org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)
        at
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.TimeoutException: Invocation of public abstract
java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
timed out.
        at
java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)
        at
java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
        at
java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
        at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
        at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
        at akka.dispatch.OnComplete.internal(Future.scala:263)
        at akka.dispatch.OnComplete.internal(Future.scala:261)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
        at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
        at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
        at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
        at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
        at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
        at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
        at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.TimeoutException: Invocation of public
abstract java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
timed out.
        at
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
        at
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)
        at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]
after [10000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
        at
akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
        at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
        at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
        at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
        at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
        at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
        at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
        at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
        at
akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
        ... 1 more/


Can somebody help with this ? Why is this code failing ? Is out approach
scalable or Is there any better way of doing this ? Considering that every
CEP operator creates a thread, will this work in production with so many
threads per task slot ? Does CEP library support combining multiple patterns
in a single operator/thread ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Running multiple CEP pattern rules

Dawid Wysakowicz-2
Hi Tejas,

It will not work that way. Bear in mind that every application of
CEP.pattern creates a new operator in the graph. The exceptions you are
seeing most probably result from calculating that huge graph and sending
that over. You are reaching the timeout on submitting that huge graph.
You can have many different patterns in a single job, but the number of
vertices in your graph is not unlimited.

In your scenario I'd try to combine the rules in a single operator. You
could try to use the ProcessFunction for that.

Best,

Dawid

On 28/05/2021 01:53, Tejas wrote:

> Hi,
> We are running into errors when running multiple CEP patterns. Here’s our
> use-case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules and doing a POC for that. For POC we have around 1000
> pattern based rules which we are translating into CEP patterns and running
> these rules on a keyed stream of events data to detect patterns. We are
> partitioning the stream by orgId and each rule needs to be run into each
> org. Here’s the code we’ve written to implement that :
> /DataStream<Event> eventStream = null;
> DataStream<Event> partitionedInput =
>     eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);
> List<Rule> ruleList = new ArrayList<>();
> for (int i = 0; i < 100; i++) {
>   ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));
>   ruleList.add(
>       new Rule("rule" + (i + 500), "process4", "process5", "process6"));
> }
> for (Rule rule : ruleList) {
>   String st = rule.getStart();
>   String mi = rule.getMid();
>   String en = rule.getEnd();
>   String nm = rule.getName();
>   Pattern<Event, ?> pattern =
>       Pattern.begin(
>           Pattern.<Event>begin("start")
>               .where(
>                   new SimpleCondition<Event>() {
>                     @Override
>                     public boolean filter(Event value) throws Exception {
>                       return value.getProcess().equals(st);
>                     }
>                   })
>               .followedBy("middle")
>               .where(
>                   new SimpleCondition<Event>() {
>                     @Override
>                     public boolean filter(Event event) {
>                       return !event.getProcess().equals(mi);
>                     }
>                   })
>               .optional()
>               .followedBy("end")
>               .where(
>                   new SimpleCondition<Event>() {
>                     @Override
>                     public boolean filter(Event event) {
>                       return event.getProcess().equals(en);
>                     }
>                   }));
>   PatternStream<Event> patternStream = CEP.pattern(partitionedInput,
> pattern);
>   DataStream<String> alerts =
>       patternStream.process(
>           new PatternProcessFunction<Event, String>() {
>             @Override
>             public void processMatch(
>                 Map<String, List&lt;Event>> map, Context context,
> Collector<String> collector)
>                 throws Exception {
>               Event start = map.containsKey("start") ?
> map.get("start").get(0) : null;
>               Event middle = map.containsKey("middle") ?
> map.get("middle").get(0) : null;
>               Event end = map.containsKey("end") ? map.get("end").get(0) :
> null;
>               StringJoiner joiner = new StringJoiner(",");
>               joiner
>                   .add("Rule : " + nm + " ")
>                   .add((start == null ? "" : start.getId()))
>                   .add((middle == null ? "" : middle.getId()))
>                   .add((end == null ? "" : end.getId()));
>               collector.collect(joiner.toString());
>             }
>           });
>   alerts.print();/
> We tried to run this code on the flink cluster with 1 task manager with 4
> task slots and the task manager crashed with the error :
> /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
> at
> org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)
> at
> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException: Invocation of public abstract
> java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
> timed out.
> at
> java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)
> at
> java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
> at
> java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)
> at akka.dispatch.OnComplete.internal(Future.scala:263)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
> at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
> at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.util.concurrent.TimeoutException: Invocation of public
> abstract java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)
> timed out.
> at
> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
> at
> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]
> after [10000 ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
> reason for `AskTimeoutException` is that the recipient actor didn't send a
> reply.
> at
> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
> at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
> at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
> ... 1 more/
>
>
> Can somebody help with this ? Why is this code failing ? Is out approach
> scalable or Is there any better way of doing this ? Considering that every
> CEP operator creates a thread, will this work in production with so many
> threads per task slot ? Does CEP library support combining multiple patterns
> in a single operator/thread ?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Running multiple CEP pattern rules

Tejas
This post was updated on .
Hi Dawid,
Does that mean that we'll have to implement our own pattern matching in ProcessFunction ? or is there a way that we can hook into either of SQL or CEP fo using their pattern matching functionality while using processFunction ?
Also do you have any plans to bring this functionality in flink CEP in future ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Running multiple CEP pattern rules

Dawid Wysakowicz-2
I am afraid there is no much of an active development going on in the
CEP library. I would not expect new features there in the nearest future.

On 28/05/2021 22:00, Tejas wrote:
> Hi Dawid,
> Do you have any plans to bring this functionality in flink CEP in future ?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


OpenPGP_signature (855 bytes) Download Attachment