Flink cep checkpoint size

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink cep checkpoint size

Singh, Mohit

Hi,

 

I am facing an issue with cep operator where checkpoint size keeps increasing even though the pattern is fully matched. I have a stream with unique user id and I want to detect a pattern of product purchased by user.

 

here is the sample stream data

 

{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product1","bids":3,"ts":"1622644781243"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":6,"ts":"1622644781245"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":4,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":2,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":1,"ts":"1622644781248"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product3","bids":1,"ts":"1622644781248"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product1","bids":3,"ts":"1622644782235"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":6,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":4,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":2,"ts":"1622644782237"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":1,"ts":"1622644782238"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product3","bids":1,"ts":"1622644782239"}

…..

…..

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);   

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "cep");
DataStream<orders> stream = env.addSource(
            new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), properties))
            .map(json -> gson.fromJson(json, orders.class))
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner((orders, timestamp) ->  orders.ts)
            );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
            "start",
            AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product1");
        }
    }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product2");
        }
    }).oneOrMore().until(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product3");
        }
    }).within(Time.seconds(10));   

PatternStream<orders> patternStream =
            CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> orders.user_id), pattern);    DataStream<String> alerts = patternStream.select((PatternSelectFunction<orders, String>) matches ->
            matches.get("start").get(0).user_id + "->" +
            matches.get("middle").get(0).ts);   

alerts.print();
   

 

 

I have also attached the checkpoint file.

 

It looks like the NFA state keeps track of all keys seen and the start state and that leads to increase in checkpoint size if the keys are not reused in patterns. So, if I have fixed number of keys the size do not increase. is this the expected behavior and correct understanding?
Is there a way to drop these keys once the pattern is matched.? or am I missing something here?

 

Thanks,

Mohit


7694ffa2-a5dc-4d3d-b8dc-3681a44f99bc (100K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink cep checkpoint size

David Anderson-4
This sounds exactly like FLINK-19970: State leak in CEP Operators (expired events/keys not removed from state) [1]. If so, upgrading to 1.13 should take care of the problem.

Best,

On Wed, Jun 2, 2021 at 5:47 PM Singh, Mohit <[hidden email]> wrote:

Hi,

 

I am facing an issue with cep operator where checkpoint size keeps increasing even though the pattern is fully matched. I have a stream with unique user id and I want to detect a pattern of product purchased by user.

 

here is the sample stream data

 

{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product1","bids":3,"ts":"1622644781243"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":6,"ts":"1622644781245"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":4,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":2,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":1,"ts":"1622644781248"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product3","bids":1,"ts":"1622644781248"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product1","bids":3,"ts":"1622644782235"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":6,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":4,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":2,"ts":"1622644782237"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":1,"ts":"1622644782238"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product3","bids":1,"ts":"1622644782239"}

…..

…..

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);   

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "cep");
DataStream<orders> stream = env.addSource(
            new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), properties))
            .map(json -> gson.fromJson(json, orders.class))
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner((orders, timestamp) ->  orders.ts)
            );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
            "start",
            AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product1");
        }
    }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product2");
        }
    }).oneOrMore().until(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product3");
        }
    }).within(Time.seconds(10));   

PatternStream<orders> patternStream =
            CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> orders.user_id), pattern);    DataStream<String> alerts = patternStream.select((PatternSelectFunction<orders, String>) matches ->
            matches.get("start").get(0).user_id + "->" +
            matches.get("middle").get(0).ts);   

alerts.print();
   

 

 

I have also attached the checkpoint file.

 

It looks like the NFA state keeps track of all keys seen and the start state and that leads to increase in checkpoint size if the keys are not reused in patterns. So, if I have fixed number of keys the size do not increase. is this the expected behavior and correct understanding?
Is there a way to drop these keys once the pattern is matched.? or am I missing something here?

 

Thanks,

Mohit

Reply | Threaded
Open this post in threaded view
|

Re: Flink cep checkpoint size

Dawid Wysakowicz-2

Hi Mohit,

What David wrote is one potential explanation.

However, if I understand your description correctly is that you have a keyed stream with let's say a single event per key which starts a partial match. After that you never see more events for that key. If that's the case a potential solution would be to add a time boundary within which the match must occur or otherwise it will get purged[1] along with the progress of the Watermark for the partition.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/#withintime

On 02/06/2021 20:58, David Anderson wrote:
This sounds exactly like FLINK-19970: State leak in CEP Operators (expired events/keys not removed from state) [1]. If so, upgrading to 1.13 should take care of the problem.

Best,

On Wed, Jun 2, 2021 at 5:47 PM Singh, Mohit <[hidden email]> wrote:

Hi,

 

I am facing an issue with cep operator where checkpoint size keeps increasing even though the pattern is fully matched. I have a stream with unique user id and I want to detect a pattern of product purchased by user.

 

here is the sample stream data

 

{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product1","bids":3,"ts":"1622644781243"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":6,"ts":"1622644781245"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":4,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":2,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":1,"ts":"1622644781248"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product3","bids":1,"ts":"1622644781248"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product1","bids":3,"ts":"1622644782235"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":6,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":4,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":2,"ts":"1622644782237"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":1,"ts":"1622644782238"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product3","bids":1,"ts":"1622644782239"}

…..

…..

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);   

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "cep");
DataStream<orders> stream = env.addSource(
            new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), properties))
            .map(json -> gson.fromJson(json, orders.class))
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner((orders, timestamp) ->  orders.ts)
            );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
            "start",
            AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product1");
        }
    }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product2");
        }
    }).oneOrMore().until(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product3");
        }
    }).within(Time.seconds(10));   

PatternStream<orders> patternStream =
            CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> orders.user_id), pattern);    DataStream<String> alerts = patternStream.select((PatternSelectFunction<orders, String>) matches ->
            matches.get("start").get(0).user_id + "->" +
            matches.get("middle").get(0).ts);   

alerts.print();
   

 

 

I have also attached the checkpoint file.

 

It looks like the NFA state keeps track of all keys seen and the start state and that leads to increase in checkpoint size if the keys are not reused in patterns. So, if I have fixed number of keys the size do not increase. is this the expected behavior and correct understanding?
Is there a way to drop these keys once the pattern is matched.? or am I missing something here?

 

Thanks,

Mohit


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink cep checkpoint size

Singh, Mohit

Hi Dawid,

 

Thank you for your prompt reply.

 

As per David advice I upgraded to 1.13 and unfortunately it didn’t work. Also, as you can see in my sample code I don’t have any partial matches and already using time boundary in my code.

I further looked into FLINK-19970 and it looks like the issue I am facing is exactly the “Test2(constant key rotation)”.

I am able to replicate this issue using your test program mentioned in  FLINK-19970 with few modification.

so instead of using e.key, I used e.id and changed the pattern times to 1 and can see continuous increase in checkpoint size.

 

public class CepBug {

                public static void main(String[] args) throws Exception {

                                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                                env.enableCheckpointing(100);

                                env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);

 

                                Pattern<Event, ?> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())

                                                                .times(1)

                                                                .within(Time.milliseconds(10));

 

                                KeyedStream<Event, Object> events = env.addSource(new RichParallelSourceFunction<Event>() {

                                                private volatile boolean isRunning = true;

                                                private long currentTimestamp = 0;

                                                private int currentId = 0;

 

                                                @Override

                                                public void run(SourceContext<Event> sourceContext) throws Exception {

                                                                while (isRunning) {

                                                                                sourceContext.collectWithTimestamp(

                                                                                                                new Event(

                                                                                                                                                currentId++,

                                                                                                                                                getRuntimeContext().getIndexOfThisSubtask(),

                                                                                                                                                currentTimestamp

                                                                                                                ),

                                                                                                                currentTimestamp++

                                                                                );

                                                                                Thread.sleep(10);

                                                                }

                                                }

 

                                                @Override

                                                public void cancel() {

                                                                this.isRunning = false;

                                                }

                                })

                                                                .assignTimestampsAndWatermarks(

                                                                                                WatermarkStrategy.forMonotonousTimestamps()

                                                                )

                                                                .keyBy(e -> e.id);

 

                                SingleOutputStreamOperator<String> start = CEP.pattern(events, pattern)

                                                                .process(new PatternProcessFunction<Event, String>() {

                                                                                @Override

                                                                                public void processMatch(

                                                                                                                Map<String, List<Event>> map,

                                                                                                                Context context,

                                                                                                                Collector<String> collector) throws Exception {

                                                                                                collector.collect(

                                                                                                                                map.get("start")

                                                                                                                                                                .stream()

                                                                                                                                                                .map(e -> String.format("%s,%s", e.id, e.timestamp))

                                                                                                                                                                .collect(Collectors.joining(";"))

                                                                                                );

                                                                                }

                                                                });

 

                                start.print();

                                env.execute();

                }

 

                public static class Event {

                                public final int id;

                                public final int key;

                                public final long timestamp;

 

                                public Event(int id, int key, long timestamp) {

                                                this.id = id;

                                                this.key = key;

                                                this.timestamp = timestamp;

                                }

                }

}

 

From: Dawid Wysakowicz <[hidden email]>
Date: Thursday, June 3, 2021 at 2:27 AM
To: David Anderson <[hidden email]>, "Singh, Mohit" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: RE: [EXTERNAL] Flink cep checkpoint size

 

Hi Mohit,

What David wrote is one potential explanation.

However, if I understand your description correctly is that you have a keyed stream with let's say a single event per key which starts a partial match. After that you never see more events for that key. If that's the case a potential solution would be to add a time boundary within which the match must occur or otherwise it will get purged[1] along with the progress of the Watermark for the partition.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/#withintime

On 02/06/2021 20:58, David Anderson wrote:

This sounds exactly like FLINK-19970: State leak in CEP Operators (expired events/keys not removed from state) [1]. If so, upgrading to 1.13 should take care of the problem.

 

Best,

 

On Wed, Jun 2, 2021 at 5:47 PM Singh, Mohit <[hidden email]> wrote:

Hi,

 

I am facing an issue with cep operator where checkpoint size keeps increasing even though the pattern is fully matched. I have a stream with unique user id and I want to detect a pattern of product purchased by user.

 

here is the sample stream data

 

{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product1","bids":3,"ts":"1622644781243"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":6,"ts":"1622644781245"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":4,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":2,"ts":"1622644781247"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product2","bids":1,"ts":"1622644781248"}
{"user_id":"45eff814-9016-4849-b607-391601e00e97 ","product":"product3","bids":1,"ts":"1622644781248"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product1","bids":3,"ts":"1622644782235"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":6,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":4,"ts":"1622644782236"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":2,"ts":"1622644782237"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product2","bids":1,"ts":"1622644782238"}
{"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b ","product":"product3","bids":1,"ts":"1622644782239"}

…..

…..

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);   

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "cep");
DataStream<orders> stream = env.addSource(
            new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), properties))
            .map(json -> gson.fromJson(json, orders.class))
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<orders>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner((orders, timestamp) ->  orders.ts)
            );    Pattern<orders, ?> pattern = Pattern.<orders>begin(
            "start",
            AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product1");
        }
    }).times(1).followedBy("middle").where(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product2");
        }
    }).oneOrMore().until(new SimpleCondition<orders>() {
        @Override
        public boolean filter(orders value) throws Exception {
            return value.product.equals("product3");
        }
    }).within(Time.seconds(10));   

PatternStream<orders> patternStream =
            CEP.pattern(stream.keyBy((KeySelector<orders, String>) orders -> orders.user_id), pattern);    DataStream<String> alerts = patternStream.select((PatternSelectFunction<orders, String>) matches ->
            matches.get("start").get(0).user_id + "->" +
            matches.get("middle").get(0).ts);   

alerts.print();
   

 

 

I have also attached the checkpoint file.

 

It looks like the NFA state keeps track of all keys seen and the start state and that leads to increase in checkpoint size if the keys are not reused in patterns. So, if I have fixed number of keys the size do not increase. is this the expected behavior and correct understanding?
Is there a way to drop these keys once the pattern is matched.? or am I missing something here?

 

Thanks,

Mohit