Apache Flink CEP 1.2 , more time on G1 GC.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink CEP 1.2 , more time on G1 GC.

Abiramalakshmi Natarajan
Hi,

I am new to Flink, using Flink CEP to detect absence of a event, defined following Pattern Stream using RMQSource to ingest events. I executed this job in in local Flink cluster. Testing it with default configuration, modified taskmanager slots count to 2 in flink.conf.yaml.

Generating 10K events with 10K keys for every 1 minute. I executed this test with G1GC and java version 1.8. G1GC, memory usage is in control but number of young generation garbage collection happens is very high and it takes 1 minute for young generation of GC..

- Is number of GC in young generation can be avoided by any clean up in  CEP Pattern Stream operator. Is this JIRA https://issues.apache.org/jira/browse/FLINK-6032 related to it?

Any help on this?


//My Test Code

                env.setParallelism(1);
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

                RMQSource<CheckInRequest> rmqSource = new RMQSource<CheckInRequest>(
                                connectionConfig, "checkinQueue", true, new CheckInDeserializationScheme());

                DataStream<CheckInRequest> inputEventStream = env
                                .addSource(rmqSource)
                        .assignTimestampsAndWatermarks(new IngestionTimeExtractor<CheckInRequest>());

                //Continuously prints the input events
                inputEventStream.print();  

                Pattern<CheckInRequest, ?> checkinEventPattern = Pattern.<CheckInRequest>begin("first")
                                .subtype(CheckInRequest.class)
                                .next("second")
                                .subtype(CheckInRequest.class)
                                .within(Time.seconds(360));

               PatternStream<CheckInRequest> patternStream = CEP.pattern(inputEventStream.keyBy(new
                                                                              KeySelector<CheckInRequest, String>() {

                        public String getKey(CheckInRequest value) throws Exception
                        {
                                return value.getSerialNum();
                        }
                }), checkinEventPattern);

                 DataStream<Either<CheckInRequest, String>> result =
                         CEP.pattern(inputEventStream.keyBy(new KeySelector<CheckInRequest, String>() {

                public String getKey(CheckInRequest value) throws Exception
                {
                return value.getSerialNum();
                }
                }), checkinEventPattern).
                select(new PatternTimeoutFunction<CheckInRequest, CheckInRequest>() {

                public CheckInRequest timeout(Map<String, CheckInRequest> pattern, long
                            timeoutTimestamp) throws Exception {            
                return pattern.get("first");
                }

                },new PatternSelectFunction<CheckInRequest, String>() {
                public String select(Map<String, CheckInRequest> pattern) {
                StringBuilder builder = new StringBuilder();

                builder.append(pattern.get("first").toString());
                System.out.println("select function:" + pattern.get("first").getSerialNum() + ":" + pattern.get("first").getEventTime() + ":" + System.currentTimeMillis());
                return builder.toString();
                }

                });
               
Thanks,
Abirami