IllegalArgumentException with CEP & reinterpretAsKeyedStream

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

IllegalArgumentException with CEP & reinterpretAsKeyedStream

Averell
Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/ val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Yun Tang
Hi Averell

I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.

Best
Yun Tang




From: Averell <[hidden email]>
Sent: Sunday, May 5, 2019 16:43
To: [hidden email]
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream
 
Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Averell
Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me to follow on how to do).
However, I could now confirm that the "union" is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union" is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (.union(streamC.keyBy(r => (r.id1, r.id2)).map(r => r)))

I don't know why "union" is behaving like that though. Could not find that mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang <[hidden email]> wrote:
Hi Averell

I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.

Best
Yun Tang




From: Averell <[hidden email]>
Sent: Sunday, May 5, 2019 16:43
To: [hidden email]
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream
 
Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Yun Tang
Hi Averell

Would you please share the Flink web graph UI to illustrate the change after you append a map operator?

Best
Yun Tang

From: Le-Van Huyen <[hidden email]>
Sent: Monday, May 6, 2019 11:15
To: Yun Tang
Cc: [hidden email]
Subject: Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream
 
Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me to follow on how to do).
However, I could now confirm that the "union" is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union" is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (.union(streamC.keyBy(r => (r.id1, r.id2)).map(r => r)))

I don't know why "union" is behaving like that though. Could not find that mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang <[hidden email]> wrote:
Hi Averell

I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.

Best
Yun Tang




From: Averell <[hidden email]>
Sent: Sunday, May 5, 2019 16:43
To: [hidden email]
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream
 
Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Averell
Hello Yun,

Bellow is the one when there is no "map". Output of the top-left box is my Stream A. The line from the bottom-left box to the top-right box is my Stream B. The "co-flatmap" is a filter.
From the bottom-left to the right box is my stream C
Screen Shot 2019-05-07 at 10.06.15.png

And here below is the version with "map"
Screen Shot 2019-05-07 at 10.04.56.png

Regards,
Averell


On Mon, May 6, 2019 at 9:30 PM Yun Tang <[hidden email]> wrote:
Hi Averell

Would you please share the Flink web graph UI to illustrate the change after you append a map operator?

Best
Yun Tang

From: Le-Van Huyen <[hidden email]>
Sent: Monday, May 6, 2019 11:15
To: Yun Tang
Cc: [hidden email]
Subject: Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream
 
Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me to follow on how to do).
However, I could now confirm that the "union" is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union" is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (.union(streamC.keyBy(r => (r.id1, r.id2)).map(r => r)))

I don't know why "union" is behaving like that though. Could not find that mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang <[hidden email]> wrote:
Hi Averell

I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.

Best
Yun Tang




From: Averell <[hidden email]>
Sent: Sunday, May 5, 2019 16:43
To: [hidden email]
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream
 
Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/