StreamCorruptedException

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamCorruptedException

Sridhar Chellappa
I am running Flink 1.3.0 against Kafka 0.10. I managed to bring the flink cluster up and have been running my flink CEP job for more than 3 hours when I see the following exception :

The messages consumed from Kafka are protobuf messages and I use a protobuf serializer. i have no clue as to where is this exception coming from. Can someone help?



java.lang.IllegalStateException: Could not initialize keyed state backend.
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
    at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2828)
    at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2862)
    at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2764)
    at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2196)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1838)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1203)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1161)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:948)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:839)
    at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:473)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:354)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
    ... 6 more
Reply | Threaded
Open this post in threaded view
|

Re: StreamCorruptedException

Tzu-Li (Gordon) Tai
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: StreamCorruptedException

Sridhar Chellappa
Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?

                                                         




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: StreamCorruptedException

Tzu-Li (Gordon) Tai
I talked a bit with Kostas on what may be happening here.

It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.
Could you perhaps provide an overview / code snippet of what your CEP job is doing?

Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.

Cheers,
Gordon

On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([hidden email]) wrote:

Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?

                                                         




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: StreamCorruptedException

Sridhar Chellappa
Here is the snippet :

public interface Rule {
    DataStream<Alert> run();
}

public class Rule1 implements Rule {

    private static final String RULE_ID = "Rule1"

    @Override
    public DataStream<Alert> run() {


        Pattern<MyMessage1, ?> MyMessage1Pattern =
                Pattern.<MyMessage1>begin("first").
                        subtype(MyMessage1.class).
                        next("second").
                        subtype(MyMessage1.class).
                        within(Time.minutes(15);

        PatternStream<MyMessage1> MyMessage1PatternStream =
                CEP.pattern(
                        MyMessage1DataStream.keyBy("field1", "field2"),
                        MyMessage1Pattern
                );

       return (MyMessage1PatternStream.select(
                new PatternSelectFunction<MyMessage1, Alert>() {
                    @Override
                    public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception {

                        String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID);

                        return new CEPAlert(alertMessage);
                    }
                }
            )
        );

    }



    private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule1(MyMessage1DataStream));

        return rules;
    }

   
    private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule2(MyMessage1DataStream));
        return rules;
    }


   public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema());

        return flinkCepConsumer;
    }


   public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema());

        return flinkCepConsumer;
    }


    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<MyMessage1> message1Stream = env.addSource(
            getStreamSource1(env, parameterTool);
        );


        DataStream<MyMessge2> message2Stream = env.addSource(
            getStreamSource2(env, parameterTool);
        );


        getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print());
        getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print());
        env.execute(STREAMING_JOB_NAME);
    }






On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I talked a bit with Kostas on what may be happening here.

It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.
Could you perhaps provide an overview / code snippet of what your CEP job is doing?

Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.

Cheers,
Gordon

On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([hidden email]) wrote:

Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?

                                                         




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: StreamCorruptedException

Sridhar Chellappa
One more point to add.

I disabled checkpoints (by commenting out code that calls enableCheckpointing()) and re-ran the job this time with plenty of memory to the job manager

~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d

At the Jobmanager, I am still hitting:

2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Starting YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Current user: flink
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Maximum heap size: 16384 MiBytes
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Hadoop version: 2.7.2
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM Options:
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Xmx18432m
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlogback.configurationFile=file:logback.xml
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlog4j.configuration=file:log4j.properties
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Program Arguments: (none)

                                                                                 .
                                                                                 .

2017-09-25 06:50:51,925 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from DEPLOYING to RUNNING.
2017-09-25 13:38:54,175 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
2017-09-25 13:38:54,187 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from localhost/127.0.0.1:55550
2017-09-25 16:30:39,974 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) switched from RUNNING to CANCELED.
2017-09-25 16:30:39,975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded

        at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
        at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)


On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <[hidden email]> wrote:
Here is the snippet :

public interface Rule {
    DataStream<Alert> run();
}

public class Rule1 implements Rule {

    private static final String RULE_ID = "Rule1"

    @Override
    public DataStream<Alert> run() {


        Pattern<MyMessage1, ?> MyMessage1Pattern =
                Pattern.<MyMessage1>begin("first").
                        subtype(MyMessage1.class).
                        next("second").
                        subtype(MyMessage1.class).
                        within(Time.minutes(15);

        PatternStream<MyMessage1> MyMessage1PatternStream =
                CEP.pattern(
                        MyMessage1DataStream.keyBy("field1", "field2"),
                        MyMessage1Pattern
                );

       return (MyMessage1PatternStream.select(
                new PatternSelectFunction<MyMessage1, Alert>() {
                    @Override
                    public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception {

                        String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID);

                        return new CEPAlert(alertMessage);
                    }
                }
            )
        );

    }



    private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule1(MyMessage1DataStream));

        return rules;
    }

   
    private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule2(MyMessage1DataStream));
        return rules;
    }


   public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema());

        return flinkCepConsumer;
    }


   public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema());

        return flinkCepConsumer;
    }


    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<MyMessage1> message1Stream = env.addSource(
            getStreamSource1(env, parameterTool);
        );


        DataStream<MyMessge2> message2Stream = env.addSource(
            getStreamSource2(env, parameterTool);
        );


        getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print());
        getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print());
        env.execute(STREAMING_JOB_NAME);
    }






On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I talked a bit with Kostas on what may be happening here.

It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.
Could you perhaps provide an overview / code snippet of what your CEP job is doing?

Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.

Cheers,
Gordon

On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([hidden email]) wrote:

Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?

                                                         




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: StreamCorruptedException

Kostas Kloudas
Hi Sridhar,

From looking at your code:

1) The “KafkaDataSource” is a custom source that you implemented? Does this source buffer anything?
2) The getStreamSource2 seems to return again a "new KafkaDataSource<MyMessage1>”. Can this be a problem?
3) You are working on processing time and you are simply detecting if 2 messages of the same type came within 15min right? 
I suppose that this could also be implemented using the times() quantifier, but this is just a matter of taste.
Could you reduce this to a smaller duration and see if you still get a corrupted stream exception?

Thanks,
Kostas

On Sep 27, 2017, at 5:42 AM, Sridhar Chellappa <[hidden email]> wrote:

One more point to add.

I disabled checkpoints (by commenting out code that calls enableCheckpointing()) and re-ran the job this time with plenty of memory to the job manager

~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d

At the Jobmanager, I am still hitting:

2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Starting YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Current user: flink
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Maximum heap size: 16384 MiBytes
2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Hadoop version: 2.7.2
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM Options:
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Xmx18432m
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlogback.configurationFile=file:logback.xml
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlog4j.configuration=file:log4j.properties
2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Program Arguments: (none)

                                                                                 .
                                                                                 .

2017-09-25 06:50:51,925 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from DEPLOYING to RUNNING.
2017-09-25 13:38:54,175 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
2017-09-25 13:38:54,187 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from localhost/127.0.0.1:55550
2017-09-25 16:30:39,974 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) switched from RUNNING to CANCELED.
2017-09-25 16:30:39,975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded

        at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
        at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)


On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <[hidden email]> wrote:
Here is the snippet :

public interface Rule {
    DataStream<Alert> run();
}

public class Rule1 implements Rule {

    private static final String RULE_ID = "Rule1"

    @Override
    public DataStream<Alert> run() {


        Pattern<MyMessage1, ?> MyMessage1Pattern =
                Pattern.<MyMessage1>begin("first").
                        subtype(MyMessage1.class).
                        next("second").
                        subtype(MyMessage1.class).
                        within(Time.minutes(15);

        PatternStream<MyMessage1> MyMessage1PatternStream =
                CEP.pattern(
                        MyMessage1DataStream.keyBy("field1", "field2"),
                        MyMessage1Pattern
                );

       return (MyMessage1PatternStream.select(
                new PatternSelectFunction<MyMessage1, Alert>() {
                    @Override
                    public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception {

                        String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID);

                        return new CEPAlert(alertMessage);
                    }
                }
            )
        );

    }



    private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule1(MyMessage1DataStream));

        return rules;
    }

   
    private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule2(MyMessage1DataStream));
        return rules;
    }


   public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema());

        return flinkCepConsumer;
    }


   public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema());

        return flinkCepConsumer;
    }


    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<MyMessage1> message1Stream = env.addSource(
            getStreamSource1(env, parameterTool);
        );


        DataStream<MyMessge2> message2Stream = env.addSource(
            getStreamSource2(env, parameterTool);
        );


        getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print());
        getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print());
        env.execute(STREAMING_JOB_NAME);
    }






On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
I talked a bit with Kostas on what may be happening here.

It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.
Could you perhaps provide an overview / code snippet of what your CEP job is doing?

Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.

Cheers,
Gordon

On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([hidden email]) wrote:

Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?

                                                         




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/