FlinkCEP, circular references and checkpointing failures

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

FlinkCEP, circular references and checkpointing failures

Federico D'Ambrosio
Hello everyone,

I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when it comes to checkpoints and within clauses windows closing at the same time a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.

The following is the relevant code:

val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000) //Checkpoints every minute
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))

//Pattern
val pattern =
  Pattern
    .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude >= 37000)
    .notNext("disappearing").where(_.event.instantValues.altitude >= 37000).within(Time.minutes(1))

// Associate KeyedStream with pattern to be detected
val patternStream  = CEP.pattern(streamById, pattern)


which causes failure on the second checkpoint with the following exception stack trace:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 fo                                                       r operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1                                                        (1/1).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:970)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51                                                       1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                       java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                       .java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator                                                        KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateExcept                                                       ion: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"                                                       YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G                                                       OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-                                                       11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 150971668500                                                       0, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4                                                       3)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed                                                        state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:90)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.cleanup(StreamTask.java:1023)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalSta                                                       teException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"o                                                       rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati                                                       on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time                                                       ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509                                                       716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti                                                       l.java:43)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(S                                                       tateUtil.java:85)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for entry:                                                        SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"                                                       :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":                                                       370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129                                                       ,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5)                                                       , SharedBufferEdge(null, 6)], 1)
                at org.apache.flink.util.Preconditions.checkState(Preconditions.                                                       java:195)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:971)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:838)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :928)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :852)
                at org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest                                                       edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:347)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:329)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal                                                       l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna                                                       pshot(HeapKeyedStateBackend.java:372)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperat                                                       or.snapshotState(AbstractStreamOperator.java:397)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.executeCheckpointing(StreamTask.java:1094)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin                                                       tState(StreamTask.java:654)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCh                                                       eckpoint(StreamTask.java:590)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh                                                       eckpointOnBarrier(StreamTask.java:543)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe                                                       ckpoint(BarrierBuffer.java:378)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa                                                       rrier(BarrierBuffer.java:228)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo                                                       nBlocked(BarrierBuffer.java:183)
                at org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr                                                       ocessInput(StreamInputProcessor.java:213)
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r                                                       un(OneInputStreamTask.java:69)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St                                                       reamTask.java:263)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id f                                                       or entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"                                                       ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"                                                       ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat                                                       ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge                                                       (null, 5), SharedBufferEdge(null, 6)], 1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 fo                                                       r operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1                                                        (1/1).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:970)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51                                                       1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                       java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                       .java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator                                                        KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateExcept                                                       ion: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"                                                       YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G                                                       OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-                                                       11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 150971668500                                                       0, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4                                                       3)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed                                                        state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:90)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.cleanup(StreamTask.java:1023)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"o                                                       rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati                                                       on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time                                                       ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509                                                       716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti                                                       l.java:43)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(S                                                       tateUtil.java:85)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for entry:                                                        SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"                                                       :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":                                                       370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5)                                                       , SharedBufferEdge(null, 6)], 1)
                at org.apache.flink.util.Preconditions.checkState(Preconditions.                                                       java:195)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:971)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:838)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :928)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :852)
                at org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest                                                       edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:347)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:329)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal                                                       l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna                                                       pshot(HeapKeyedStateBackend.java:372)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperat                                                       or.snapshotState(AbstractStreamOperator.java:397)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.executeCheckpointing(StreamTask.java:1094)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin                                                       tState(StreamTask.java:654)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCh                                                       eckpoint(StreamTask.java:590)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh                                                       eckpointOnBarrier(StreamTask.java:543)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe                                                       ckpoint(BarrierBuffer.java:378)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa                                                       rrier(BarrierBuffer.java:228)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo                                                       nBlocked(BarrierBuffer.java:183)
                at org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr                                                       ocessInput(StreamInputProcessor.java:213)
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r                                                       un(OneInputStreamTask.java:69)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St                                                       reamTask.java:263)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id f                                                       or entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"                                                       ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"                                                       ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat                                                       ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge                                                       (null, 5), SharedBufferEdge(null, 6)], 1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi                                                       alizeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato                                                       rs(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S                                                       treamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask                                                       .java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object                                                       InputStream.java:2519)
        at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre                                                       am.java:2553)
        at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn                                                       putStream.java:2455)
        at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162                                                       1)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20                                                       00)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j                                                       ava:1211)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java                                                       :1169)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State                                                       TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav                                                       a:132)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart                                                       itionedState(HeapKeyedStateBackend.java:518)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea                                                       pKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB                                                       ackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi                                                       alizeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato                                                       rs(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S                                                       treamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask                                                       .java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object                                                       InputStream.java:2519)
        at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre                                                       am.java:2553)
        at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn                                                       putStream.java:2455)
        at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162                                                       1)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20                                                       00)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j                                                       ava:1211)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java                                                       :1169)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State                                                       TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav                                                       a:132)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart                                                       itionedState(HeapKeyedStateBackend.java:518)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea                                                       pKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB                                                       ackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:311)
        ... 6 more



What is happening here? Am I doing something wrong? Is there some sort of conflict between within clauses deadlines and checkpoint deadlines?

I found the following similar JIRA pages, but none of those mention circular references: https://issues.apache.org/jira/browse/FLINK-6321

Kind Regards,
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Federico D'Ambrosio
I'm sorry, I realized that the stacktrack was poorly formatted, here it is a better formatting:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operatorKeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
                at org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
                at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
                at org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
                at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
        at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
        at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
        at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
        at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
        at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
        at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more


2017-11-03 15:12 GMT+01:00 Federico D'Ambrosio <[hidden email]>:
Hello everyone,

I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when it comes to checkpoints and within clauses windows closing at the same time a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.

The following is the relevant code:

val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000) //Checkpoints every minute
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))

//Pattern
val pattern =
  Pattern
    .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude >= 37000)
    .notNext("disappearing").where(_.event.instantValues.altitude >= 37000).within(Time.minutes(1))

// Associate KeyedStream with pattern to be detected
val patternStream  = CEP.pattern(streamById, pattern)


which causes failure on the second checkpoint with the following exception stack trace:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 fo                                                       r operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1                                                        (1/1).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:970)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51                                                       1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                       java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                       .java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator                                                        KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateExcept                                                       ion: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"                                                       YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G                                                       OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-                                                       11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 150971668500                                                       0, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4                                                       3)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed                                                        state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:90)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.cleanup(StreamTask.java:1023)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalSta                                                       teException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"o                                                       rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati                                                       on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time                                                       ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509                                                       716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti                                                       l.java:43)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(S                                                       tateUtil.java:85)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for entry:                                                        SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"                                                       :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":                                                       370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129                                                       ,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5)                                                       , SharedBufferEdge(null, 6)], 1)
                at org.apache.flink.util.Preconditions.checkState(Preconditions.                                                       java:195)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:971)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:838)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :928)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :852)
                at org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest                                                       edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:347)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:329)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal                                                       l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna                                                       pshot(HeapKeyedStateBackend.java:372)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperat                                                       or.snapshotState(AbstractStreamOperator.java:397)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.executeCheckpointing(StreamTask.java:1094)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin                                                       tState(StreamTask.java:654)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCh                                                       eckpoint(StreamTask.java:590)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh                                                       eckpointOnBarrier(StreamTask.java:543)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe                                                       ckpoint(BarrierBuffer.java:378)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa                                                       rrier(BarrierBuffer.java:228)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo                                                       nBlocked(BarrierBuffer.java:183)
                at org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr                                                       ocessInput(StreamInputProcessor.java:213)
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r                                                       un(OneInputStreamTask.java:69)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St                                                       reamTask.java:263)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id f                                                       or entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"                                                       ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"                                                       ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat                                                       ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge                                                       (null, 5), SharedBufferEdge(null, 6)], 1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 fo                                                       r operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1                                                        (1/1).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:970)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51                                                       1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                       java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                       .java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator                                                        KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateExcept                                                       ion: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"                                                       YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G                                                       OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-                                                       11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 150971668500                                                       0, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4                                                       3)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu                                                       nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed                                                        state future.
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:90)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.cleanup(StreamTask.java:1023)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec                                                       kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"o                                                       rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati                                                       on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time                                                       ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509                                                       716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti                                                       l.java:43)
                at org.apache.flink.runtime.state.StateUtil.discardStateFuture(S                                                       tateUtil.java:85)
                at org.apache.flink.streaming.api.operators.OperatorSnapshotResu                                                       lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for entry:                                                        SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"                                                       :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":                                                       370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5)                                                       , SharedBufferEdge(null, 6)], 1)
                at org.apache.flink.util.Preconditions.checkState(Preconditions.                                                       java:195)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:971)
                at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.                                                       serialize(SharedBuffer.java:838)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :928)
                at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java                                                       :852)
                at org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest                                                       edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:347)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p                                                       erformOperation(HeapKeyedStateBackend.java:329)
                at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal                                                       l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna                                                       pshot(HeapKeyedStateBackend.java:372)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperat                                                       or.snapshotState(AbstractStreamOperator.java:397)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin                                                       tingOperation.executeCheckpointing(StreamTask.java:1094)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin                                                       tState(StreamTask.java:654)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCh                                                       eckpoint(StreamTask.java:590)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh                                                       eckpointOnBarrier(StreamTask.java:543)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe                                                       ckpoint(BarrierBuffer.java:378)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa                                                       rrier(BarrierBuffer.java:228)
                at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo                                                       nBlocked(BarrierBuffer.java:183)
                at org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr                                                       ocessInput(StreamInputProcessor.java:213)
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r                                                       un(OneInputStreamTask.java:69)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St                                                       reamTask.java:263)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id f                                                       or entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"                                                       ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"                                                       ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat                                                       ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge                                                       (null, 5), SharedBufferEdge(null, 6)], 1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink: notific                                                       ation-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi                                                       alizeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato                                                       rs(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S                                                       treamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask                                                       .java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object                                                       InputStream.java:2519)
        at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre                                                       am.java:2553)
        at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn                                                       putStream.java:2455)
        at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162                                                       1)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20                                                       00)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j                                                       ava:1211)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java                                                       :1169)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State                                                       TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav                                                       a:132)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart                                                       itionedState(HeapKeyedStateBackend.java:518)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea                                                       pKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB                                                       ackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:321)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi                                                       alizeState(AbstractStreamOperator.java:217)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato                                                       rs(StreamTask.java:676)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S                                                       treamTask.java:663)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask                                                       .java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object                                                       InputStream.java:2519)
        at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre                                                       am.java:2553)
        at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn                                                       putStream.java:2455)
        at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162                                                       1)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20                                                       00)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1                                                       801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j                                                       ava:1211)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java                                                       :1169)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State                                                       TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav                                                       a:132)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart                                                       itionedState(HeapKeyedStateBackend.java:518)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea                                                       pKeyedStateBackend.java:397)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB                                                       ackend(StreamTask.java:772)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK                                                       eyedState(AbstractStreamOperator.java:311)
        ... 6 more



What is happening here? Am I doing something wrong? Is there some sort of conflict between within clauses deadlines and checkpoint deadlines?

I found the following similar JIRA pages, but none of those mention circular references: https://issues.apache.org/jira/browse/FLINK-6321

Kind Regards,
Federico D'Ambrosio



--
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Kostas Kloudas
In reply to this post by Federico D'Ambrosio
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        

Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Federico D'Ambrosio
Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Kostas Kloudas
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio

Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Federico D'Ambrosio
Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Kostas Kloudas
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio

Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Federico D'Ambrosio
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Shailesh Jain
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Dawid Wysakowicz-2

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Shailesh Jain
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio

Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Dawid Wysakowicz-2

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Shailesh Jain
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio


Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Shailesh Jain
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio


Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Shailesh Jain
Bump.

On Thu, Oct 25, 2018 at 9:11 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio


Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Dawid Wysakowicz-2
In reply to this post by Shailesh Jain

This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Till Rohrmann
Hi Shailesh,

could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <[hidden email]> wrote:

This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio


Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Stefan Richter
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.

Best,
Stefan

On 7. Nov 2018, at 17:17, Till Rohrmann <[hidden email]> wrote:

Hi Shailesh,

could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <[hidden email]> wrote:

This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio



Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Till Rohrmann
Really good finding Stefan!

On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <[hidden email]> wrote:
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.

Best,
Stefan

On 7. Nov 2018, at 17:17, Till Rohrmann <[hidden email]> wrote:

Hi Shailesh,

could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <[hidden email]> wrote:

This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio



Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP, circular references and checkpointing failures

Shailesh Jain
Thanks a lot for looking into this issue Stefan.

Could you please let me know the issue ID once you open it? It'll help me understand the problem better, and also I could do a quick test in our environment once the issue is resolved.

Thanks,
Shailesh

On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <[hidden email] wrote:
Really good finding Stefan!

On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <[hidden email]> wrote:
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.

Best,
Stefan

On 7. Nov 2018, at 17:17, Till Rohrmann <[hidden email]> wrote:

Hi Shailesh,

could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <[hidden email]> wrote:

This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.

I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html). Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <[hidden email]> wrote:
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in 1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <[hidden email]> wrote:
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <[hidden email]>:
Perfect! thanks a lot!

Kostas

On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <[hidden email]> wrote:

Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <[hidden email]> wrote:

 Could not find id for entry:                                                        




--
Federico D'Ambrosio




--
Federico D'Ambrosio




--
Federico D'Ambrosio



12