Hi everyone,
I have a big stream A, filtered by flags from a small stream B, then unioned with another stream C to become the input for my CEP. As the three streams A, B, C are all keyed, I expected that the output stream resulting from connecting/unioning them would also be keyed, thus I used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I got the error /IllegalArgumentException/ (full stack-trace below). If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and use /keyBy/ manually), then there's no such exception. I don't know how to debug this error, and not sure whether I should use keyed streams with CEP? Thanks and best regards, Averell My code: / val cepInput = streamA.keyBy(r => (r.id1, r.id2)) .connect(streamB.keyBy(r => (r.id1, r.id2))) .flatMap(new MyCandidateFilterFunction()) .union(streamC.keyBy(r => (r.id1, r.id2))) val cepOutput = MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1, r.id2)), counter1, counter2, threshold1, threshold2) object MyCEP { def apply(input: KeyedStream[Event, _], longPeriod: Int, threshold: Int, shortPeriod: Int): DataStream[Event] = { val patternLineIsUp = Pattern.begin[Event]("period1") .where((value: event, ctx: CepContext[Event]) => accSum(_.counter, Seq("period1"), value, ctx) < threshold) .times(longPeriod - shortPeriod).consecutive() .next("period2") .where((value: Event, ctx: CepContext[Event]) => accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold && value.status == "up") .times(shortPeriod).consecutive() collectPattern(input, patternLineIsUp) } private def accSum(f: Event => Long, keys: Seq[String], currentEvent: Event, ctx: CepContext[Event]): Long = { keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum + f(currentEvent) } private def collectPattern(inputStream: KeyedStream[Event, _], pattern: Pattern[Event, Event]): DataStream[Event] = CEP.pattern(inputStream, pattern) .process((map: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, collector: Collector[Event]) => { val records = map.get("period2") collector.collect(records.get(records.size() - 1)) }) }/ The exception: /Exception in thread "main" 12:43:13,103 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.mycompany.StreamingJob$.main(Streaming.scala:440) at com.mycompany.StreamingJob.main(Streaming.scala) Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) / -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell
I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in
EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.
Best
Yun Tang
From: Averell <[hidden email]>
Sent: Sunday, May 5, 2019 16:43 To: [hidden email] Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream Hi everyone,
I have a big stream A, filtered by flags from a small stream B, then unioned with another stream C to become the input for my CEP. As the three streams A, B, C are all keyed, I expected that the output stream resulting from connecting/unioning them would also be keyed, thus I used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I got the error /IllegalArgumentException/ (full stack-trace below). If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and use /keyBy/ manually), then there's no such exception. I don't know how to debug this error, and not sure whether I should use keyed streams with CEP? Thanks and best regards, Averell My code: / val cepInput = streamA.keyBy(r => (r.id1, r.id2)) .connect(streamB.keyBy(r => (r.id1, r.id2))) .flatMap(new MyCandidateFilterFunction()) .union(streamC.keyBy(r => (r.id1, r.id2))) val cepOutput = MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1, r.id2)), counter1, counter2, threshold1, threshold2) object MyCEP { def apply(input: KeyedStream[Event, _], longPeriod: Int, threshold: Int, shortPeriod: Int): DataStream[Event] = { val patternLineIsUp = Pattern.begin[Event]("period1") .where((value: event, ctx: CepContext[Event]) => accSum(_.counter, Seq("period1"), value, ctx) < threshold) .times(longPeriod - shortPeriod).consecutive() .next("period2") .where((value: Event, ctx: CepContext[Event]) => accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold && value.status == "up") .times(shortPeriod).consecutive() collectPattern(input, patternLineIsUp) } private def accSum(f: Event => Long, keys: Seq[String], currentEvent: Event, ctx: CepContext[Event]): Long = { keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum + f(currentEvent) } private def collectPattern(inputStream: KeyedStream[Event, _], pattern: Pattern[Event, Event]): DataStream[Event] = CEP.pattern(inputStream, pattern) .process((map: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, collector: Collector[Event]) => { val records = map.get("period2") collector.collect(records.get(records.size() - 1)) }) }/ The exception: /Exception in thread "main" 12:43:13,103 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.mycompany.StreamingJob$.main(Streaming.scala:440) at com.mycompany.StreamingJob.main(Streaming.scala) Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) / -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thank you Yun. I haven't tried to follow your guide to check (would take some time for me to follow on how to do). However, I could now confirm that the "union" is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union" is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (.union(streamC.keyBy(r => (r.id1, r.id2)).map(r => r))) I don't know why "union" is behaving like that though. Could not find that mentioned in any document. Thanks a lot for your help. Regards, Averell On Sun, May 5, 2019 at 11:22 PM Yun Tang <[hidden email]> wrote:
|
Hi Averell
Would you please share the Flink web graph UI to illustrate the change after you append a
map operator?
Best
Yun Tang
From: Le-Van Huyen <[hidden email]>
Sent: Monday, May 6, 2019 11:15 To: Yun Tang Cc: [hidden email] Subject: Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream Thank you Yun.
I haven't tried to follow your guide to check (would take some time for me to follow on how to do). However, I could now confirm that the "union" is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union"
is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (.union(streamC.keyBy(r
=> (r.id1, r.id2)).map(r => r)))
I don't know why "union" is behaving like that though. Could not find that mentioned in any document. Thanks a lot for your help.
Regards,
Averell
On Sun, May 5, 2019 at 11:22 PM Yun Tang <[hidden email]> wrote:
|
Hello Yun, Bellow is the one when there is no "map". Output of the top-left box is my Stream A. The line from the bottom-left box to the top-right box is my Stream B. The "co-flatmap" is a filter. From the bottom-left to the right box is my stream C And here below is the version with "map" Regards, Averell On Mon, May 6, 2019 at 9:30 PM Yun Tang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |