Hello everyone, I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when it comes to checkpoints and within clauses windows closing at the same time a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60000) //Checkpoints every minute env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir")) val pattern = Pattern .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude >= 37000) .notNext("disappearing").where(_.event.instantValues.altitude >= 37000).within(Time.minutes(1)) // Associate KeyedStream with pattern to be detected val patternStream = CEP.pattern(streamById, pattern) which causes failure on the second checkpoint with the following exception stack trace: AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 fo r operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu nnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51 1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateExcept ion: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":" YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017- 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 150971668500 0, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4 3) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu nnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResu lt.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec kpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec kpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalSta teException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"o rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509 716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti l.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(S tateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResu lt.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight" :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed": 370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129 ,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5) , SharedBufferEdge(null, 6)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions. java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer. serialize(SharedBuffer.java:971) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer. serialize(SharedBuffer.java:838) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java :928) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java :852) at org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p erformOperation(HeapKeyedStateBackend.java:347) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p erformOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal l(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna pshot(HeapKeyedStateBackend.java:372) at org.apache.flink.streaming.api.operators.AbstractStreamOperat or.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin tingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin tingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin tState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCh eckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh eckpointOnBarrier(StreamTask.java:543) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe ckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa rrier(BarrierBuffer.java:228) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo nBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr ocessInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r un(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St reamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id f or entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG" ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593" ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge (null, 5), SharedBufferEdge(null, 6)], 1)] 11/03/2017 13:46:46 Job execution switched to status FAILING. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 fo r operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu nnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51 1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateExcept ion: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":" YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017- 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 150971668500 0, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4 3) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu nnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResu lt.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec kpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec kpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"o rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509 716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti l.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(S tateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResu lt.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight" :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed": 370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5) , SharedBufferEdge(null, 6)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions. java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer. serialize(SharedBuffer.java:971) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer. serialize(SharedBuffer.java:838) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java :928) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java :852) at org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p erformOperation(HeapKeyedStateBackend.java:347) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p erformOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal l(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna pshot(HeapKeyedStateBackend.java:372) at org.apache.flink.streaming.api.operators.AbstractStreamOperat or.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin tingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin tingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin tState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCh eckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh eckpointOnBarrier(StreamTask.java:543) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe ckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa rrier(BarrierBuffer.java:228) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo nBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr ocessInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r un(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St reamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id f or entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG" ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593" ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge (null, 5), SharedBufferEdge(null, 6)], 1)] 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELING 11/03/2017 13:46:46 Process(1/1) switched to CANCELING 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELED 11/03/2017 13:46:46 Process(1/1) switched to CANCELED 11/03/2017 13:46:46 Job execution switched to status RESTARTING. 11/03/2017 13:46:56 Job execution switched to status CREATED. 11/03/2017 13:46:56 Job execution switched to status RUNNING. 11/03/2017 13:46:56 Source: json-parser(1/1) switched to SCHEDULED 11/03/2017 13:46:56 Process(1/1) switched to SCHEDULED 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notific ation-sink-1(1/1) switched to SCHEDULED 11/03/2017 13:46:56 Source: json-parser(1/1) switched to DEPLOYING 11/03/2017 13:46:56 Process(1/1) switched to DEPLOYING 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notific ation-sink-1(1/1) switched to DEPLOYING 11/03/2017 13:46:56 Process(1/1) switched to RUNNING 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notific ation-sink-1(1/1) switched to RUNNING 11/03/2017 13:46:56 Source: json-parser(1/1) switched to RUNNING 11/03/2017 13:46:57 KeyedCEPPatternOperator -> alert-select -> Sink: notific ation-sink-1(1/1) switched to FAILED java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK eyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi alizeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato rs(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S treamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object InputStream.java:2519) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre am.java:2553) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn putStream.java:2455) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162 1) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1 774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20 00) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1 801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j ava:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java :1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav a:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart itionedState(HeapKeyedStateBackend.java:518) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea pKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB ackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK eyedState(AbstractStreamOperator.java:311) ... 6 more 11/03/2017 13:46:57 Job execution switched to status FAILING. java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK eyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi alizeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato rs(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S treamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object InputStream.java:2519) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre am.java:2553) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn putStream.java:2455) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162 1) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1 774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20 00) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1 801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j ava:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java :1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav a:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart itionedState(HeapKeyedStateBackend.java:518) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea pKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB ackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK eyedState(AbstractStreamOperator.java:311) ... 6 more What is happening here? Am I doing something wrong? Is there some sort of conflict between within clauses deadlines and checkpoint deadlines? I found the following similar JIRA pages, but none of those mention circular references: https://issues.apache.org/jira/browse/FLINK-6321 Kind Regards, Federico D'Ambrosio |
I'm sorry, I realized that the stacktrack was poorly formatted, here it is a better formatting: AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operatorKeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852) at org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)] 11/03/2017 13:46:46 Job execution switched to status FAILING. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852) at org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)] 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELING 11/03/2017 13:46:46 Process(1/1) switched to CANCELING 11/03/2017 13:46:46 Source: json-parser(1/1) switched to CANCELED 11/03/2017 13:46:46 Process(1/1) switched to CANCELED 11/03/2017 13:46:46 Job execution switched to status RESTARTING. 11/03/2017 13:46:56 Job execution switched to status CREATED. 11/03/2017 13:46:56 Job execution switched to status RUNNING. 11/03/2017 13:46:56 Source: json-parser(1/1) switched to SCHEDULED 11/03/2017 13:46:56 Process(1/1) switched to SCHEDULED 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to SCHEDULED 11/03/2017 13:46:56 Source: json-parser(1/1) switched to DEPLOYING 11/03/2017 13:46:56 Process(1/1) switched to DEPLOYING 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to DEPLOYING 11/03/2017 13:46:56 Process(1/1) switched to RUNNING 11/03/2017 13:46:56 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to RUNNING 11/03/2017 13:46:56 Source: json-parser(1/1) switched to RUNNING 11/03/2017 13:46:57 KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1(1/1) switched to FAILED java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more 11/03/2017 13:46:57 Job execution switched to status FAILING. java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more 2017-11-03 15:12 GMT+01:00 Federico D'Ambrosio <[hidden email]>:
|