Hi,
I am new to Flink, using Flink CEP to detect absence of a event, defined following Pattern Stream using RMQSource to ingest events. I executed this job in in local Flink cluster. Testing it with default configuration, modified taskmanager slots count to 2 in flink.conf.yaml. Generating 10K events with 10K keys for every 1 minute. I executed this test with G1GC and java version 1.8. G1GC, memory usage is in control but number of young generation garbage collection happens is very high and it takes 1 minute for young generation of GC.. - Is number of GC in young generation can be avoided by any clean up in CEP Pattern Stream operator. Is this JIRA https://issues.apache.org/jira/browse/FLINK-6032 related to it? Any help on this? //My Test Code env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); RMQSource<CheckInRequest> rmqSource = new RMQSource<CheckInRequest>( connectionConfig, "checkinQueue", true, new CheckInDeserializationScheme()); DataStream<CheckInRequest> inputEventStream = env .addSource(rmqSource) .assignTimestampsAndWatermarks(new IngestionTimeExtractor<CheckInRequest>()); //Continuously prints the input events inputEventStream.print(); Pattern<CheckInRequest, ?> checkinEventPattern = Pattern.<CheckInRequest>begin("first") .subtype(CheckInRequest.class) .next("second") .subtype(CheckInRequest.class) .within(Time.seconds(360)); PatternStream<CheckInRequest> patternStream = CEP.pattern(inputEventStream.keyBy(new KeySelector<CheckInRequest, String>() { public String getKey(CheckInRequest value) throws Exception { return value.getSerialNum(); } }), checkinEventPattern); DataStream<Either<CheckInRequest, String>> result = CEP.pattern(inputEventStream.keyBy(new KeySelector<CheckInRequest, String>() { public String getKey(CheckInRequest value) throws Exception { return value.getSerialNum(); } }), checkinEventPattern). select(new PatternTimeoutFunction<CheckInRequest, CheckInRequest>() { public CheckInRequest timeout(Map<String, CheckInRequest> pattern, long timeoutTimestamp) throws Exception { return pattern.get("first"); } },new PatternSelectFunction<CheckInRequest, String>() { public String select(Map<String, CheckInRequest> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("first").toString()); System.out.println("select function:" + pattern.get("first").getSerialNum() + ":" + pattern.get("first").getEventTime() + ":" + System.currentTimeMillis()); return builder.toString(); } }); Thanks, Abirami |
Free forum by Nabble | Edit this page |