Flink CEP AbstractCEPPatternOperator fail after event detection

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP AbstractCEPPatternOperator fail after event detection

norman sp
Hi,
I'm trying out the new CEP library but have some problems with event detection.
In my case Flink detects the event pattern: A followed by B within 10 seconds.
But short time after event detection when the event pattern isn't matched anymore, the program crashes with the error message:


04/06/2016 11:04:47 Job execution switched to status FAILING.
java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
        at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
        at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


After that, the job execution is restarted and proceeds well until the next AbstractCEPPatternOperator failes.

That's my code:

Pattern<Tuple5<String, String, Double, Double, Double>, ?> FlowPattern = Pattern.<Tuple5<String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}})
.within(Time.seconds(10));

PatternStream<Tuple5<String, String, Double, Double, Double>> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new FlowPatternWarning());
warning.print();

private static class FlowPatternWarning implements PatternSelectFunction<Tuple5<String, String, Double, Double, Double>, String> {
                @Override
                public String select(Map<String, Tuple5<String, String, Double, Double, Double>> pat) throws Exception {
                      Tuple5<String, String, Double, Double, Double> pressure = pat.get("PressureOver10");
                      Tuple5<String, String, Double, Double, Double> flow = pat.get("FlowOver10");
               
                        return "  #######   Warning! FlowPattern   ####### " + pressure.toString() + " - " + flow.toString();
                }
        }


How can I solve that?
Hope somebody could help me.

greetz Norman
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

Till Rohrmann
Hi Norman,

which version of Flink are you using? We recently fixed some issues with the CEP library which looked similar to your error message. The problem occurred when using the CEP library with processing time. Switching to event or ingestion time, solve the problem.

The fixes to make it also work with processing time are included in the latest snapshot version 1.1-SNAPSHOT and will be part of the upcoming 1.0.1 bugfix release. The bugfix release will actually be released today.

If the problem should still remain with the latest version, it would be good to see your complete Flink program.

Cheers,
Till

On Wed, Apr 6, 2016 at 11:04 AM, norman sp <[hidden email]> wrote:
Hi,
I'm trying out the new CEP library but have some problems with event
detection.
In my case Flink detects the event pattern: A followed by B within 10
seconds.
But short time after event detection when the event pattern isn't matched
anymore, the program crashes with the error message:

04/06/2016 11:04:47     Job execution switched to status FAILING.
java.lang.NullPointerException
        at
org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
        at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
        at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


After that, the job execution is restarted and proceeds well until the next
AbstractCEPPatternOperator failes.

That's my code:
Pattern<Tuple5&lt;String, String, Double, Double, Double>, ?> FlowPattern =
Pattern.<Tuple5&lt;String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.within(Time.seconds(10));

PatternStream<Tuple5&lt;String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new
FlowPatternWarning());
warning.print();

private static class FlowPatternWarning implements
PatternSelectFunction<Tuple5&lt;String, String, Double, Double, Double>,
String> {
                @Override
                public String select(Map<String, Tuple5&lt;String, String, Double, Double,
Double>> pat) throws Exception {
                      Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
                      Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");

                        return "  #######   Warning! FlowPattern   ####### " +
pressure.toString() + " - " + flow.toString();
                }
        }


How can I solve that?
Hope somebody could help me.

greetz Norman




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

norman sp
In reply to this post by norman sp
Hi Till,
I used Flink version 1.0.0 and tried all three TimeCharacteristics.
Not I tried the new Flink 1.0.1 that gives me the following error.
After detecting an event it processes a few stream tuples but then crashes.
I'm not sure how to solve that part of the error message: "This can indicate that the element belonging to the previous relation has been already pruned, even though you expect it to be still there"

4>   #######   Warning! FlowPatternEvent:   #######
4> (7605e43c-ca20-4524-af5f-23fd8e55d7b9,Intensity - Value,25.0,3.0,75.0)
4> (5025ef64-2b83-4112-b9bd-2d7de46454c9,Pressure - Value,7.0,3.0,21.0)
4> (3d501c5b-b0e2-41e6-bfdc-14c996df6e19,Flow - Value,27.0,3.0,81.0)
04/06/2016 16:41:31 Map -> Sink: Unnamed(4/4) switched to CANCELED
04/06/2016 16:41:31 AbstractCEPPatternOperator(1/1) switched to FAILED
java.lang.RuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:292)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:136)
        at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not find previous shared buffer entry with key: State(start, Normal, [
StateTransition(TAKE, FlowOver10, with filter),
StateTransition(IGNORE, start),
]), value: (83788338-b26c-4538-a437-77d1bbf7b348,Pressure - Value,7.0,5.0,35.0) and timestamp: 1459953675999. This can indicate that the element belonging to the previous relation has been already pruned, even though you expect it to be still there.
        at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:269)
        ... 8 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

Till Rohrmann
Hi Norman,

this error is exactly what I thought I had fixed. I guess there is still another case where a premature pruning can happen in the SharedBuffer. Could you maybe send me the example code with which you could produce the error. The input data would also be very helpful. Then I can debug it. Thanks :-)

Cheers,
Till

On Thu, Apr 7, 2016 at 7:50 AM, norman sp <[hidden email]> wrote:
Hi Till,
I used Flink version 1.0.0 and tried all three TimeCharacteristics.
Not I tried the new Flink 1.0.1 that gives me the following error.
After detecting an event it processes a few stream tuples but then crashes.
I'm not sure how to solve that part of the error message: "This can indicate
that the element belonging to the previous relation has been already pruned,
even though you expect it to be still there"

4>   #######   Warning! FlowPatternEvent:   #######
4> (7605e43c-ca20-4524-af5f-23fd8e55d7b9,Intensity - Value,25.0,3.0,75.0)
4> (5025ef64-2b83-4112-b9bd-2d7de46454c9,Pressure - Value,7.0,3.0,21.0)
4> (3d501c5b-b0e2-41e6-bfdc-14c996df6e19,Flow - Value,27.0,3.0,81.0)
04/06/2016 16:41:31     Map -> Sink: Unnamed(4/4) switched to CANCELED
04/06/2016 16:41:31     AbstractCEPPatternOperator(1/1) switched to FAILED
java.lang.RuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:292)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:136)
        at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not find previous shared
buffer entry with key: State(start, Normal, [
StateTransition(TAKE, FlowOver10, with filter),
StateTransition(IGNORE, start),
]), value: (83788338-b26c-4538-a437-77d1bbf7b348,Pressure -
Value,7.0,5.0,35.0) and timestamp: 1459953675999. This can indicate that the
element belonging to the previous relation has been already pruned, even
though you expect it to be still there.
        at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:269)
        ... 8 more




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5976.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

norman sp
Hi Till,
thank you. here's the code:

public class CepStorzSimulator {
       
        public static void main(String[] args) throws Exception {
           
                        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
       
                        if(parameterTool.getNumberOfParameters() < 3) {
                                System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> --bootstrap.servers <kafka brokers> --group.id <some id>");
                                System.exit(1);
                        }
                                               
                        CepStorzSimulator reader = new CepStorzSimulator();
            reader.run(parameterTool);
        }
       
        public void run(ParameterTool parameterTool) throws Exception {
               
                String topic = "test-simulator";        

                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //env.getConfig().disableSysoutLogging();
                env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 5000));
                //env.enableCheckpointing(15000); // create a checkpoint every 5 seconds
                env.setParallelism(4);
                env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
               
        DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), parameterTool.getProperties()));

        DataStream<Tuple5<String, String, String, String, Double>> data = kafkaStream.flatMap(new SplitMapper());    
       
        SingleOutputStreamOperator<Tuple6<String, String, String, Double, Double, Double>> windowedData =
                                 data.filter(new FilterFunction<Tuple5<String, String, String, String, Double>>() {

                                        private static final long serialVersionUID = -5952425756492833594L;

                                                @Override
                                                public boolean filter(Tuple5<String, String, String, String, Double> val) throws Exception {

                                                        return val.f3.contains("target - Value");
                                                }
                                         })
                                         .keyBy(3)
                                         .timeWindow(Time.seconds(10), Time.seconds(1))
                                         .fold(new Tuple6<>("", "", "", 0.0d, 0.0d, 0.0d), new pressureElementCount());
                                         
                windowedData.print();
               
                Pattern<Tuple6<String, String, String, Double, Double, Double>, ?> FlowFirstPattern =
                                Pattern.<Tuple6<String, String, String, Double, Double, Double>>begin("FlowOver10")
             .where(new FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() {

              private static final long serialVersionUID = 5861517245439863889L;

                                        @Override
                                        public boolean filter(Tuple6<String, String, String, Double, Double, Double> value) throws Exception {

                                                double avgFlow= (value.f5/value.f4);
                                               
                                                return value.f2.contains("Flow target - Value") && avgFlow > 25.0;// && (value.f2 > avgFlow*1.0);
                                        }
                                })
                            .followedBy("PressureOver10").where(new FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() {

                                        private static final long serialVersionUID = -4037517308930307522L;

                                        @Override
                                        public boolean filter(Tuple6<String, String, String, Double, Double, Double> value) throws Exception {
                                               
                                                double avgPressure = (value.f5/value.f4);
                                                //System.out.println("Pressure: " + avgPressure);
                                               
                                                return value.f2.equals("Pressure target - Value") && (avgPressure > 5.0);// && (value.f2 > avgPressure*1.0);
                                        }
                                })
                            .within(Time.seconds(10));
               
                PatternStream<Tuple6<String, String, String, Double, Double, Double>> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
                DataStream<String> warning = FlowFirstPatternStream.select(new PlacingWorkingTrocarWarning());
                warning.print();
               
                env.execute();
        }
       
        private static class PlacingWorkingTrocarWarning implements PatternSelectFunction<Tuple6<String, String, String, Double, Double, Double>, String> {

                private static final long serialVersionUID = 2576609635170800026L;

                @Override
                public String select(Map<String, Tuple6<String, String, String, Double, Double, Double>> pat) throws Exception {

                        //Tuple5<String, String, Double, Double, Double> pressure = pat.get("PressureOver10");
                        //Tuple5<String, String, Double, Double, Double> flow = pat.get("FlowOver10");
               
                        return "  #######   Warning! FlowEvent   ####### ";
                }
        }
       
        private static class pressureElementCount implements FoldFunction<Tuple5<String, String, String, String, Double>, Tuple6<String, String, String, Double, Double, Double>>{

                private static final long serialVersionUID = -1081752808506520154L;

                @Override
                public Tuple6<String, String, String, Double, Double, Double> fold(Tuple6<String, String, String, Double, Double, Double> init, Tuple5<String, String, String, String, Double> val) throws Exception {
                       
                        double count = init.f4+1.0d;
                        double sum = init.f5+val.f4; //!!!
                        return new Tuple6<>(val.f0, val.f1, val.f3, val.f4, count, sum);
                }
        }
       
        private static class SplitMapper extends RichFlatMapFunction<String, Tuple5<String, String, String, String, Double>> {

                private static final long serialVersionUID = 7297664214330222193L;
               
                @Override
                public void flatMap(String msg, Collector<Tuple5<String, String, String, String, Double>> out) throws Exception {
                       
                        EncodeValues enc = new EncodeValues();
                        getRuntimeContext().getLongCounter("eventCount").add(1L);
                        String[] split_msg = msg.split("\t");
                        String DeviceId = split_msg[1];
                        String [] array = split_msg[3].split(", \"");
                                                                       
                        for(String a:array){
                                                                       
                                        String[] split = a.split(":");
                                        String val = split[1];
                                        String testname = "test1";

                                        String nom = val.replace("\"", "").replace("{", "").replace("}", "").replace(",", ".");
                                        String param = split[0].replace("\"", "").replace("{", "").replace("}", "");
                                                                               
                                        double codedVal = enc.encode(nom);
                                       
                                        out.collect(new Tuple5<String, String, String, String, Double>(UUID.randomUUID().toString(), testname, DeviceId, param, codedVal));
                        }
                }
        }
}


Example data looks like this:

1> 00:36:06.459 1 2121 {"Pressure target - Value":"6", "Pressure target - Unit":"mmHg"}
1> 00:36:06.463 1 2121 {"Flow target - Value":"23", "Flow target - Unit":"l/min"}

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

Till Rohrmann
Hi Norman,

could you provide me an example input data set which produces the error? E.g. the list of strings you inserted into Kafka/read from Kafka?

Cheers,
Till

On Thu, Apr 7, 2016 at 11:05 AM, norman sp <[hidden email]> wrote:
Hi Till,
thank you. here's the code:

public class CepStorzSimulator {

        public static void main(String[] args) throws Exception {

                        final ParameterTool parameterTool = ParameterTool.fromArgs(args);

                        if(parameterTool.getNumberOfParameters() < 3) {
                                System.out.println("Missing parameters!\nUsage: Kafka --topic <topic>
--bootstrap.servers <kafka brokers> --group.id <some id>");
                                System.exit(1);
                        }

                        CepStorzSimulator reader = new CepStorzSimulator();
                reader.run(parameterTool);
        }

        public void run(ParameterTool parameterTool) throws Exception {

                String topic = "test-simulator";

                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                //env.getConfig().disableSysoutLogging();
                env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
5000));
                //env.enableCheckpointing(15000);                                                                       // create a checkpoint every 5
seconds
                env.setParallelism(4);
                env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<String> kafkaStream = env.addSource(new
FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
parameterTool.getProperties()));

        DataStream<Tuple5&lt;String, String, String, String, Double>> data =
kafkaStream.flatMap(new SplitMapper());

        SingleOutputStreamOperator<Tuple6&lt;String, String, String, Double,
Double, Double>> windowedData =
                                 data.filter(new FilterFunction<Tuple5&lt;String, String, String,
String, Double>>() {

                                        private static final long serialVersionUID = -5952425756492833594L;

                                                @Override
                                                public boolean filter(Tuple5<String, String, String, String, Double>
val) throws Exception {

                                                        return val.f3.contains("target - Value");
                                                }
                                         })
                                         .keyBy(3)
                                         .timeWindow(Time.seconds(10), Time.seconds(1))
                                         .fold(new Tuple6<>("", "", "", 0.0d, 0.0d, 0.0d), new
pressureElementCount());

                windowedData.print();

                Pattern<Tuple6&lt;String, String, String, Double, Double, Double>, ?>
FlowFirstPattern =
                                Pattern.<Tuple6&lt;String, String, String, Double, Double,
Double>>begin("FlowOver10")
                            .where(new FilterFunction<Tuple6&lt;String, String, String, Double,
Double, Double>>() {

                                private static final long serialVersionUID = 5861517245439863889L;

                                        @Override
                                        public boolean filter(Tuple6<String, String, String, Double, Double,
Double> value) throws Exception {

                                                double avgFlow= (value.f5/value.f4);

                                                return value.f2.contains("Flow target - Value") && avgFlow > 25.0;//
&& (value.f2 > avgFlow*1.0);
                                        }
                                })
                            .followedBy("PressureOver10").where(new
FilterFunction<Tuple6&lt;String, String, String, Double, Double, Double>>()
{

                                        private static final long serialVersionUID = -4037517308930307522L;

                                        @Override
                                        public boolean filter(Tuple6<String, String, String, Double, Double,
Double> value) throws Exception {

                                                double avgPressure = (value.f5/value.f4);
                                                //System.out.println("Pressure: " + avgPressure);

                                                return value.f2.equals("Pressure target - Value") && (avgPressure >
5.0);// && (value.f2 > avgPressure*1.0);
                                        }
                                })
                            .within(Time.seconds(10));

                PatternStream<Tuple6&lt;String, String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
                DataStream<String> warning = FlowFirstPatternStream.select(new
PlacingWorkingTrocarWarning());
                warning.print();

                env.execute();
        }

        private static class PlacingWorkingTrocarWarning implements
PatternSelectFunction<Tuple6&lt;String, String, String, Double, Double,
Double>, String> {

                private static final long serialVersionUID = 2576609635170800026L;

                @Override
                public String select(Map<String, Tuple6&lt;String, String, String, Double,
Double, Double>> pat) throws Exception {

                        //Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
                        //Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");

                        return "  #######   Warning! FlowEvent   ####### ";
                }
        }

        private static class pressureElementCount implements
FoldFunction<Tuple5&lt;String, String, String, String, Double>,
Tuple6<String, String, String, Double, Double, Double>>{

                private static final long serialVersionUID = -1081752808506520154L;

                @Override
                public Tuple6<String, String, String, Double, Double, Double>
fold(Tuple6<String, String, String, Double, Double, Double> init,
Tuple5<String, String, String, String, Double> val) throws Exception {

                        double count = init.f4+1.0d;
                        double sum = init.f5+val.f4; //!!!
                        return new Tuple6<>(val.f0, val.f1, val.f3, val.f4, count, sum);
                }
        }

        private static class SplitMapper extends RichFlatMapFunction<String,
Tuple5&lt;String, String, String, String, Double>> {

                private static final long serialVersionUID = 7297664214330222193L;

                @Override
                public void flatMap(String msg, Collector<Tuple5&lt;String, String,
String, String, Double>> out) throws Exception {

                        EncodeValues enc = new EncodeValues();
                        getRuntimeContext().getLongCounter("eventCount").add(1L);
                        String[] split_msg = msg.split("\t");
                        String DeviceId = split_msg[1];
                        String [] array = split_msg[3].split(", \"");

                        for(String a:array){

                                        String[] split = a.split(":");
                                        String val = split[1];
                                        String testname = "test1";

                                        String nom = val.replace("\"", "").replace("{", "").replace("}",
"").replace(",", ".");
                                        String param = split[0].replace("\"", "").replace("{", "").replace("}",
"");

                                        double codedVal = enc.encode(nom);

                                        out.collect(new Tuple5<String, String, String, String,
Double>(UUID.randomUUID().toString(), testname, DeviceId, param, codedVal));
                        }
                }
        }
}


Example data looks like this:
1> 00:36:06.459 1       2121    {"Pressure target - Value":"6", "Pressure target -
Unit":"mmHg"}
1> 00:36:06.463 1       2121    {"Flow target - Value":"23", "Flow target -
Unit":"l/min"}





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5986.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

norman sp
Hi,
here is an example input that produces the error. These are read from Kafka.

01:43:43.592 1 2121 {"Pressure target - Value":"6"}
01:43:43.596 1 2121 {"Flow target - Value":"23"}
01:43:44.263 1 2121 {"Pressure target - Value":"7"}
01:43:44.972 1 2121 {"Flow target - Value":"24"}
01:43:45.176 1 2121 {"Flow target - Value":"25"}
01:43:45.279 1 2121 {"Flow target - Value":"26"}
01:43:45.382 1 2121 {"Flow target - Value":"27"}
01:43:45.586 1 2121 {"Flow target - Value":"28"}
01:43:46.310 1 2121 {"Pressure target - Value":"7"}
01:43:46.350 1 2121 {"Flow target - Value":"28"}
01:43:48.563 1 2121 {"Pressure target - Value":"7"}
01:43:48.567 1 2121 {"Flow target - Value":"28"}
01:43:51.850 1 2121 {"Pressure target - Value":"7"}
01:43:51.890 1 2121 {"Flow target - Value":"28"}
01:43:53.512 1 2121 {"Pressure target - Value":"7"}
01:43:53.516 1 2121 {"Flow target - Value":"28"}
01:43:56.108 1 2121 {"Pressure target - Value":"7"}
01:43:56.112 1 2121 {"Flow target - Value":"28"}
01:43:58.533 1 2121 {"Pressure target - Value":"7"}
01:43:58.538 1 2121 {"Flow target - Value":"28"}
01:44:01.600 1 2121 {"Pressure target - Value":"7"}
01:44:01.630 1 2121 {"Flow target - Value":"28"}
01:44:03.587 1 2121 {"Pressure target - Value":"7"}
01:44:03.591 1 2121 {"Flow target - Value":"28"}
01:44:06.180 1 2121 {"Pressure target - Value":"7"}
01:44:06.230 1 2121 {"Flow target - Value":"28"}


Greetz Norman
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

Till Rohrmann-2
Hi Norman,

sorry for the late reply. I finally found time and could, thanks to you, reproduce the problem. The problem was that the window borders were treated differently in two parts of the code. Now the left border of a window is inclusive and the right border (late elements) is exclusive. I've already committed a fix for this problem and tested it with your example data. If you use the latest master version, then your CEP example will now work :-)

Thanks for reporting the problem.

Cheers,
Till

On Thu, Apr 7, 2016 at 12:15 PM, norman sp <[hidden email]> wrote:
Hi,
here is an example input that produces the error. These are read from Kafka.

01:43:43.592    1       2121    {"Pressure target - Value":"6"}
01:43:43.596    1       2121    {"Flow target - Value":"23"}
01:43:44.263    1       2121    {"Pressure target - Value":"7"}
01:43:44.972    1       2121    {"Flow target - Value":"24"}
01:43:45.176    1       2121    {"Flow target - Value":"25"}
01:43:45.279    1       2121    {"Flow target - Value":"26"}
01:43:45.382    1       2121    {"Flow target - Value":"27"}
01:43:45.586    1       2121    {"Flow target - Value":"28"}
01:43:46.310    1       2121    {"Pressure target - Value":"7"}
01:43:46.350    1       2121    {"Flow target - Value":"28"}
01:43:48.563    1       2121    {"Pressure target - Value":"7"}
01:43:48.567    1       2121    {"Flow target - Value":"28"}
01:43:51.850    1       2121    {"Pressure target - Value":"7"}
01:43:51.890    1       2121    {"Flow target - Value":"28"}
01:43:53.512    1       2121    {"Pressure target - Value":"7"}
01:43:53.516    1       2121    {"Flow target - Value":"28"}
01:43:56.108    1       2121    {"Pressure target - Value":"7"}
01:43:56.112    1       2121    {"Flow target - Value":"28"}
01:43:58.533    1       2121    {"Pressure target - Value":"7"}
01:43:58.538    1       2121    {"Flow target - Value":"28"}
01:44:01.600    1       2121    {"Pressure target - Value":"7"}
01:44:01.630    1       2121    {"Flow target - Value":"28"}
01:44:03.587    1       2121    {"Pressure target - Value":"7"}
01:44:03.591    1       2121    {"Flow target - Value":"28"}
01:44:06.180    1       2121    {"Pressure target - Value":"7"}
01:44:06.230    1       2121    {"Flow target - Value":"28"}


Greetz Norman



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5996.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.