I am running Flink 1.3.0 against Kafka 0.10. I managed to bring the flink cluster up and have been running my flink CEP job for more than 3 hours when I see the following exception : The messages consumed from Kafka are protobuf messages and I use a protobuf serializer. i have no clue as to where is this exception coming from. Can someone help?java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2828) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2862) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2764) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1838) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1203) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1161) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:948) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:839) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:473) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:354) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more |
Hi Sridhar,
Sorry that this didn't get a response earlier. According to the trace, it seems like the job failed during the process, and when trying to automatically restore from a checkpoint, deserialization of a CEP `IterativeCondition` object failed. As far as I can tell, CEP operators are just using Java serialization on CEP `IterativeCondition` objects, so should not be related to the protobuf serializer that you are using. Is this still constantly happening for you? Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for the reply. Well, tracing back to the root cause, I see the following: 1. At the Job manager, the Checkpoint times are getting worse :Jobmanager : Checkpoint times are getting worse progressively. 2017-09-16 05:05:50,813 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1505538350809 2017-09-16 05:05:51,396 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 (11101233 bytes in 586 ms). 2017-09-16 05:07:30,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1505538450809 2017-09-16 05:07:31,657 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 (18070955 bytes in 583 ms). . . . . . . . . . . . . . 2017-09-16 07:32:58,117 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 89 (246125113 bytes in 27194 ms). 2017-09-16 07:34:10,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 90 @ 1505547250809 2017-09-16 07:34:44,932 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 90 (248272325 bytes in 34012 ms). 2017-09-16 07:35:50,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 91 @ 1505547350809 2017-09-16 07:36:37,058 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 91 (250348812 bytes in 46136 ms). 2017-09-16 07:37:30,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 92 @ 1505547450809 2017-09-16 07:38:18,076 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 92 (252399724 bytes in 47152 ms). 2017-09-16 07:39:10,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 93 @ 1505547550809 2017-09-16 07:40:13,494 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 93 (254374636 bytes in 62573 ms). 2017-09-16 07:40:50,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 94 @ 1505547650809 2017-09-16 07:42:42,850 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 94 (256386533 bytes in 111898 ms). 2017-09-16 07:42:42,850 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 95 @ 1505547762850 2017-09-16 07:46:06,241 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 95 (258441766 bytes in 203268 ms). 2017-09-16 07:46:06,241 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 96 @ 1505547966241 2017-09-16 07:48:42,069 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) ... 5 more On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi Sridhar, |
I talked a bit with Kostas on what may be happening here. It could be that your patterns are not closing, which depends on the pattern construction of your CEP job. Could you perhaps provide an overview / code snippet of what your CEP job is doing? Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here. Gordon
On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([hidden email]) wrote:
|
Here is the snippet : public interface Rule { DataStream<Alert> run(); } public class Rule1 implements Rule { private static final String RULE_ID = "Rule1" @Override public DataStream<Alert> run() { Pattern<MyMessage1, ?> MyMessage1Pattern = Pattern.<MyMessage1>begin("first"). subtype(MyMessage1.class). next("second"). subtype(MyMessage1.class). within(Time.minutes(15); PatternStream<MyMessage1> MyMessage1PatternStream = CEP.pattern( MyMessage1DataStream.keyBy("field1", "field2"), MyMessage1Pattern ); return (MyMessage1PatternStream.select( new PatternSelectFunction<MyMessage1, Alert>() { @Override public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception { String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID); return new CEPAlert(alertMessage); } } ) ); } private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) { List<Rule> rules = new ArrayList<Rule>(); rules.add(new Rule1(MyMessage1DataStream)); return rules; } private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) { List<Rule> rules = new ArrayList<Rule>(); rules.add(new Rule2(MyMessage1DataStream)); return rules; } public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) { env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS); env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); KafkaDataSource<T> flinkCepConsumer = new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema()); return flinkCepConsumer; } public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) { env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS); env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); KafkaDataSource<T> flinkCepConsumer = new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema()); return flinkCepConsumer; } public static void main(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); DataStream<MyMessage1> message1Stream = env.addSource( getStreamSource1(env, parameterTool); ); DataStream<MyMessge2> message2Stream = env.addSource( getStreamSource2(env, parameterTool); ); getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print()); getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print()); env.execute(STREAMING_JOB_NAME); } On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
One more point to add. I disabled checkpoints (by commenting out code that calls enableCheckpointing()) and re-ran the job this time with plenty of memory to the job manager~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d 2017-09-25 06:46:44,066 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Starting YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC) 2017-09-25 06:46:44,066 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Current user: flink 2017-09-25 06:46:44,066 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 2017-09-25 06:46:44,066 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Maximum heap size: 16384 MiBytes 2017-09-25 06:46:44,066 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Hadoop version: 2.7.2 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - JVM Options: 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -Xmx18432m 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -Dlogback.configurationFile=file:logback.xml 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -Dlog4j.configuration=file:log4j.properties 2017-09-25 06:46:44,067 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Program Arguments: (none) . . 2017-09-25 06:50:51,925 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from DEPLOYING to RUNNING. 2017-09-25 13:38:54,175 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97 2017-09-25 13:38:54,187 INFO org.apache.flink.runtime.blob.BlobCache - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from localhost/127.0.0.1:55550 2017-09-25 16:30:39,974 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) switched from RUNNING to CANCELED. 2017-09-25 16:30:39,975 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <[hidden email]> wrote:
|
Hi Sridhar,
From looking at your code: 1) The “KafkaDataSource” is a custom source that you implemented? Does this source buffer anything? 2) The getStreamSource2 seems to return again a "new KafkaDataSource<MyMessage1>”. Can this be a problem? 3) You are working on processing time and you are simply detecting if 2 messages of the same type came within 15min right? I suppose that this could also be implemented using the times() quantifier, but this is just a matter of taste. Could you reduce this to a smaller duration and see if you still get a corrupted stream exception? Thanks, Kostas
|
Free forum by Nabble | Edit this page |