Gradually increasing checkpoint size

classic Classic list List threaded Threaded
7 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Gradually increasing checkpoint size

Dan
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan
Reply | Threaded
Open this post in threaded view
|

Re: Gradually increasing checkpoint size

Yun Gao
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <[hidden email]>
Subject:Gradually increasing checkpoint size
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Gradually increasing checkpoint size

Dan
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> {


    private transient ValueState<Boolean> alreadyOutputted;


    @Override

    public void flatMap(T value, Collector<T> out) throws Exception {

        if (!alreadyOutputted.value()) {

            alreadyOutputted.update(true);

            out.collect(value);

        }

    }


    @Override

    public void open(Configuration config) {

        ValueStateDescriptor<Boolean> descriptor =

                new ValueStateDescriptor<>(

                        "alreadyOutputted", // the state name

                        TypeInformation.of(new TypeHint<Boolean>() {}), // type information

                        false); // default value of the state, if nothing was set

        alreadyOutputted = getRuntimeContext().getState(descriptor);

    }

}


All of my inputs have this watermark strategy.  In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase.  After some checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(

    WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[hidden email]> wrote:
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <[hidden email]>
Subject:Gradually increasing checkpoint size
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan
Reply | Threaded
Open this post in threaded view
|

Re: Re: Gradually increasing checkpoint size

Yun Gao
Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Gradually increasing checkpoint size
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> {


    private transient ValueState<Boolean> alreadyOutputted;


    @Override

    public void flatMap(T value, Collector<T> out) throws Exception {

        if (!alreadyOutputted.value()) {

            alreadyOutputted.update(true);

            out.collect(value);

        }

    }


    @Override

    public void open(Configuration config) {

        ValueStateDescriptor<Boolean> descriptor =

                new ValueStateDescriptor<>(

                        "alreadyOutputted", // the state name

                        TypeInformation.of(new TypeHint<Boolean>() {}), // type information

                        false); // default value of the state, if nothing was set

        alreadyOutputted = getRuntimeContext().getState(descriptor);

    }

}


All of my inputs have this watermark strategy.  In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase.  After some checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(

    WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[hidden email]> wrote:
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <[hidden email]>
Subject:Gradually increasing checkpoint size
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Re: Gradually increasing checkpoint size

Dan
Hi Yun!

That advice was useful.  The state for that operator is very small (31kb).  Most of the checkpoint size is in a couple simple DataStream.intervalJoin operators.  The time intervals are fairly short.

I'm going to try running the code with some small configuration changes.  One thing I did notice is that I set a positive value for the relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator.  The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for clean up.  It has some logic around cleaning up the right side that uses timerTimestamp + lowerBound.  However, processElement doesn’t use the same logic when creating a timer (I only see + lowerBound).  Maybe I'm misreading the code.  It feels like a bug.


On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <[hidden email]> wrote:
Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Gradually increasing checkpoint size
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> {


    private transient ValueState<Boolean> alreadyOutputted;


    @Override

    public void flatMap(T value, Collector<T> out) throws Exception {

        if (!alreadyOutputted.value()) {

            alreadyOutputted.update(true);

            out.collect(value);

        }

    }


    @Override

    public void open(Configuration config) {

        ValueStateDescriptor<Boolean> descriptor =

                new ValueStateDescriptor<>(

                        "alreadyOutputted", // the state name

                        TypeInformation.of(new TypeHint<Boolean>() {}), // type information

                        false); // default value of the state, if nothing was set

        alreadyOutputted = getRuntimeContext().getState(descriptor);

    }

}


All of my inputs have this watermark strategy.  In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase.  After some checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(

    WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[hidden email]> wrote:
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <[hidden email]>
Subject:Gradually increasing checkpoint size
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan
Reply | Threaded
Open this post in threaded view
|

Re: Gradually increasing checkpoint size

Dawid Wysakowicz-2

Hey Dan,

I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound:

relativeUpperBound = upperBound for left and -lowerBound for right

relativeLowerBound = lowerBound for left and -upperBound for right

Therefore the cleaning logic in onTimer effectively uses the same logic. If I understand it correctly, this trick was introduced to deduplicate the method.

There might be a bug somewhere, but I don't think it's where you pointed. I'd suggest to first investigate the progress of watermarks.

Best,

Dawid

On 09/03/2021 08:36, Dan Hill wrote:
Hi Yun!

That advice was useful.  The state for that operator is very small (31kb).  Most of the checkpoint size is in a couple simple DataStream.intervalJoin operators.  The time intervals are fairly short.

I'm going to try running the code with some small configuration changes.  One thing I did notice is that I set a positive value for the relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator.  The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for clean up.  It has some logic around cleaning up the right side that uses timerTimestamp + lowerBound.  However, processElement doesn’t use the same logic when creating a timer (I only see + lowerBound).  Maybe I'm misreading the code.  It feels like a bug.


On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <[hidden email]> wrote:
Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Gradually increasing checkpoint size
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> {


    private transient ValueState<Boolean> alreadyOutputted;


    @Override

    public void flatMap(T value, Collector<T> out) throws Exception {

        if (!alreadyOutputted.value()) {

            alreadyOutputted.update(true);

            out.collect(value);

        }

    }


    @Override

    public void open(Configuration config) {

        ValueStateDescriptor<Boolean> descriptor =

                new ValueStateDescriptor<>(

                        "alreadyOutputted", // the state name

                        TypeInformation.of(new TypeHint<Boolean>() {}), // type information

                        false); // default value of the state, if nothing was set

        alreadyOutputted = getRuntimeContext().getState(descriptor);

    }

}


All of my inputs have this watermark strategy.  In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase.  After some checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(

    WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[hidden email]> wrote:
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <[hidden email]>
Subject:Gradually increasing checkpoint size
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan

OpenPGP_signature (855 bytes) Download Attachment
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Gradually increasing checkpoint size

Dan
I figured it out.  I have some records with the same key and I was doing an IntervalJoin.  One of the IntervalJoin implementations that I found looks like it the runtime increases exponentially when there are duplicate keys.  I introduced a de-duping step and it works a lot faster.

On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz <[hidden email]> wrote:

Hey Dan,

I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound:

relativeUpperBound = upperBound for left and -lowerBound for right

relativeLowerBound = lowerBound for left and -upperBound for right

Therefore the cleaning logic in onTimer effectively uses the same logic. If I understand it correctly, this trick was introduced to deduplicate the method.

There might be a bug somewhere, but I don't think it's where you pointed. I'd suggest to first investigate the progress of watermarks.

Best,

Dawid

On 09/03/2021 08:36, Dan Hill wrote:
Hi Yun!

That advice was useful.  The state for that operator is very small (31kb).  Most of the checkpoint size is in a couple simple DataStream.intervalJoin operators.  The time intervals are fairly short.

I'm going to try running the code with some small configuration changes.  One thing I did notice is that I set a positive value for the relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator.  The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for clean up.  It has some logic around cleaning up the right side that uses timerTimestamp + lowerBound.  However, processElement doesn’t use the same logic when creating a timer (I only see + lowerBound).  Maybe I'm misreading the code.  It feels like a bug.


On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <[hidden email]> wrote:
Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Gradually increasing checkpoint size
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> {


    private transient ValueState<Boolean> alreadyOutputted;


    @Override

    public void flatMap(T value, Collector<T> out) throws Exception {

        if (!alreadyOutputted.value()) {

            alreadyOutputted.update(true);

            out.collect(value);

        }

    }


    @Override

    public void open(Configuration config) {

        ValueStateDescriptor<Boolean> descriptor =

                new ValueStateDescriptor<>(

                        "alreadyOutputted", // the state name

                        TypeInformation.of(new TypeHint<Boolean>() {}), // type information

                        false); // default value of the state, if nothing was set

        alreadyOutputted = getRuntimeContext().getState(descriptor);

    }

}


All of my inputs have this watermark strategy.  In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase.  After some checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(

    WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[hidden email]> wrote:
Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun


------------------Original Mail ------------------
Sender:Dan Hill <[hidden email]>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <[hidden email]>
Subject:Gradually increasing checkpoint size
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is that my checkpoint has a bunch of useless state in it.

- Dan