Here is the snippet :
public interface Rule {
DataStream<Alert> run();
}
public class Rule1 implements Rule {
private static final String RULE_ID = "Rule1"
@Override
public DataStream<Alert> run() {
Pattern<MyMessage1, ?> MyMessage1Pattern =
Pattern.<MyMessage1>begin("first").
subtype(MyMessage1.class).
next("second").
subtype(MyMessage1.class).
within(Time.minutes(15);
PatternStream<MyMessage1> MyMessage1PatternStream =
CEP.pattern(
MyMessage1DataStream.keyBy("field1", "field2"),
MyMessage1Pattern
);
return (MyMessage1PatternStream.select(
new PatternSelectFunction<MyMessage1, Alert>() {
@Override
public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception {
String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID);
return new CEPAlert(alertMessage);
}
}
)
);
}
private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
List<Rule> rules = new ArrayList<Rule>();
rules.add(new Rule1(MyMessage1DataStream));
return rules;
}
private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
List<Rule> rules = new ArrayList<Rule>();
rules.add(new Rule2(MyMessage1DataStream));
return rules;
}
public RichParallelSourceFunction<MyMessage1> getStreamSource1( StreamExecutionEnvironment env, ParameterTool parameterTool) {
env.enableCheckpointing(parameterTool.getInt( CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints( DEFAULT_MIN_PAUSE_BETWEEN_ CHECKPOINTS);
env.getCheckpointConfig().setCheckpointTimeout( CheckpointConfig.DEFAULT_ TIMEOUT);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1) ;
KafkaDataSource<T> flinkCepConsumer =
new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema());
return flinkCepConsumer;
}
public RichParallelSourceFunction<MyMessage2> getStreamSource2( StreamExecutionEnvironment env, ParameterTool parameterTool) {
env.enableCheckpointing(parameterTool.getInt( CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints( DEFAULT_MIN_PAUSE_BETWEEN_ CHECKPOINTS);
env.getCheckpointConfig().setCheckpointTimeout( CheckpointConfig.DEFAULT_ TIMEOUT);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1) ;
KafkaDataSource<T> flinkCepConsumer =
new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema());
return flinkCepConsumer;
}
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_ FILE_ARG_INDEX]);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters( parameterTool);
DataStream<MyMessage1> message1Stream = env.addSource(
getStreamSource1(env, parameterTool);
);
DataStream<MyMessge2> message2Stream = env.addSource(
getStreamSource2(env, parameterTool);
);
getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print());
getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print());
env.execute(STREAMING_JOB_NAME);
}On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:I talked a bit with Kostas on what may be happening here.It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.Could you perhaps provide an overview / code snippet of what your CEP job is doing?Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.Cheers,GordonOn 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([hidden email]) wrote:
So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?Thanks for the reply. Well, tracing back to the root cause, I see the following:1. At the Job manager, the Checkpoint times are getting worse :
Jobmanager :
Checkpoint times are getting worse progressively.
2017-09-16 05:05:50,813 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 (18070955 bytes in 583 ms).
.
.
.
.
.
.
.
.
.
.
.
.
.
2017-09-16 07:32:58,117 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc 56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe ckpointRunnable.run(StreamTask .java:970)
at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUt il.java:43)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe ckpointRunnable.run(StreamTask .java:897)
... 5 more
On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Sridhar,
Sorry that this didn't get a response earlier.
According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.
Is this still constantly happening for you?
Cheers,
Gordon
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab ble.com/
Free forum by Nabble | Edit this page |