Tumbling windows - increasing checkpoint size over time

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Tumbling windows - increasing checkpoint size over time

Wissman, Matt

Hello Flink Community,

 

I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?

 

Flink Version 1.7.1

 

Thanks!

 

-Matt

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Guowei Ma
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 于2020年5月27日周三 上午3:35写道:

>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Till Rohrmann
Hi Matt,

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <[hidden email]> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Wissman, Matt

Hello Till & Guowei,

 

Thanks for the replies! Here is a snippet of the window function:

 

  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);

 

Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process Time?

The set of keys processed in the stream is stable over time

 

The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction?

 

Thanks!

 

-Matt

 

 

From: Till Rohrmann <[hidden email]>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <[hidden email]>
Cc: "Wissman, Matt" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you.

 

Hi Matt,

 

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

 

Are you using event time? If yes, how is the watermark generated?

 

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

 

Cheers,

Till

 

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <[hidden email]> wrote:

Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 2020527日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Till Rohrmann
Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the case that you see different checkpoint sizes depending on the actual data distribution which can change over time. Have you checked whether the data distribution and rate is constant over time? 

What is the expected number of keys, size of events and number of events per key per second? Based on this information one could try to estimate an upper state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <[hidden email]> wrote:

Hello Till & Guowei,

 

Thanks for the replies! Here is a snippet of the window function:

 

  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);

 

Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process Time?

The set of keys processed in the stream is stable over time

 

The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction?

 

Thanks!

 

-Matt

 

 

From: Till Rohrmann <[hidden email]>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <[hidden email]>
Cc: "Wissman, Matt" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you.

 

Hi Matt,

 

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

 

Are you using event time? If yes, how is the watermark generated?

 

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

 

Cheers,

Till

 

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <[hidden email]> wrote:

Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 2020527日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Wissman, Matt

Till,

 

I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth.

 

 

Here is a screenshot of the checkpoint size for the pipeline. The yellow section is when we had the checkpoint interval at 2 secs – the size seems to grow linearly and indefinitely. The blue, red and orange lines are in line with what I’d expect in terms of checkpoint size (100KB-2 MB).

 

The incoming stream data for the whole time period is consistent (follows the same pattern).

 

Changing the checkpoint interval seemed to fix the problem of the large and growing checkpoint size but I’m not sure why.

 

Thanks!

 

-Matt

 

From: Till Rohrmann <[hidden email]>
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" <[hidden email]>
Cc: Guowei Ma <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

Hi Matt,

 

when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the case that you see different checkpoint sizes depending on the actual data distribution which can change over time. Have you checked whether the data distribution and rate is constant over time? 

 

What is the expected number of keys, size of events and number of events per key per second? Based on this information one could try to estimate an upper state size bound.

 

Cheers,

Till

 

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <[hidden email]> wrote:

Hello Till & Guowei,

 

Thanks for the replies! Here is a snippet of the window function:

 

  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);

 

Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process Time?

The set of keys processed in the stream is stable over time

 

The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction?

 

Thanks!

 

-Matt

 

 

From: Till Rohrmann <[hidden email]>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <[hidden email]>
Cc: "Wissman, Matt" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you.

 

Hi Matt,

 

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

 

Are you using event time? If yes, how is the watermark generated?

 

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

 

Cheers,

Till

 

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <[hidden email]> wrote:

Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 2020527日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Guowei Ma
Hi,
1. I am not the expert of Rocksdb. However, I think the state garbage collection depends on the rocksdb compaction especially if the checkpoint interval is 2s.  This is because the window element is still in the sst file even if the window is triggerred.
2. Do you try the checkpoint interval 15s?  I guess it might reduce the state size.
3. Would you like to share your rocksdb configuration? I think this could help other state guys to know whether it is related to rocksdb or not.
Best,
Guowei


Wissman, Matt <[hidden email]> 于2020年5月29日周五 下午10:30写道:

Till,

 

I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth.

 

image.png

 

Here is a screenshot of the checkpoint size for the pipeline. The yellow section is when we had the checkpoint interval at 2 secs – the size seems to grow linearly and indefinitely. The blue, red and orange lines are in line with what I’d expect in terms of checkpoint size (100KB-2 MB).

 

The incoming stream data for the whole time period is consistent (follows the same pattern).

 

Changing the checkpoint interval seemed to fix the problem of the large and growing checkpoint size but I’m not sure why.

 

Thanks!

 

-Matt

 

From: Till Rohrmann <[hidden email]>
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" <[hidden email]>
Cc: Guowei Ma <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

Hi Matt,

 

when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the case that you see different checkpoint sizes depending on the actual data distribution which can change over time. Have you checked whether the data distribution and rate is constant over time? 

 

What is the expected number of keys, size of events and number of events per key per second? Based on this information one could try to estimate an upper state size bound.

 

Cheers,

Till

 

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <[hidden email]> wrote:

Hello Till & Guowei,

 

Thanks for the replies! Here is a snippet of the window function:

 

  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);

 

Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process Time?

The set of keys processed in the stream is stable over time

 

The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction?

 

Thanks!

 

-Matt

 

 

From: Till Rohrmann <[hidden email]>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <[hidden email]>
Cc: "Wissman, Matt" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you.

 

Hi Matt,

 

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

 

Are you using event time? If yes, how is the watermark generated?

 

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

 

Cheers,

Till

 

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <[hidden email]> wrote:

Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 2020527日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling windows - increasing checkpoint size over time

Wissman, Matt

Guowei,

 

I had a different Flink app that was using 10 or15s intervals – it had a similar behavior but not nearly as bad as the 2s interval pipeline. Both have much have much longer checkpoint intervals now.

 

Here is the state config:

 

state.backend: rocksdb

    state.checkpoints.dir: {{ .Values.flink.checkpointUrl }}/checkpoints

    state.savepoints.dir: {{ .Values.flink.checkpointUrl }}/savepoints

    state.backend.incremental: true

    state.backend.rocksdb.localdir: /tmp/taskmanager

 

Thanks!

 

-Matt

 

From: Guowei Ma <[hidden email]>
Date: Monday, June 1, 2020 at 1:01 AM
To: "Wissman, Matt" <[hidden email]>
Cc: Till Rohrmann <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

Hi,

1. I am not the expert of Rocksdb. However, I think the state garbage collection depends on the rocksdb compaction especially if the checkpoint interval is 2s.  This is because the window element is still in the sst file even if the window is triggerred.

2. Do you try the checkpoint interval 15s?  I guess it might reduce the state size.

3. Would you like to share your rocksdb configuration? I think this could help other state guys to know whether it is related to rocksdb or not.

Best,

Guowei

 

 

Wissman, Matt <[hidden email]> 2020529日周五 下午10:30写道:

Till,

 

I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth.

 

 

 

Here is a screenshot of the checkpoint size for the pipeline. The yellow section is when we had the checkpoint interval at 2 secs – the size seems to grow linearly and indefinitely. The blue, red and orange lines are in line with what I’d expect in terms of checkpoint size (100KB-2 MB).

 

The incoming stream data for the whole time period is consistent (follows the same pattern).

 

Changing the checkpoint interval seemed to fix the problem of the large and growing checkpoint size but I’m not sure why.

 

Thanks!

 

-Matt

 

From: Till Rohrmann <[hidden email]>
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" <[hidden email]>
Cc: Guowei Ma <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

Hi Matt,

 

when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the case that you see different checkpoint sizes depending on the actual data distribution which can change over time. Have you checked whether the data distribution and rate is constant over time? 

 

What is the expected number of keys, size of events and number of events per key per second? Based on this information one could try to estimate an upper state size bound.

 

Cheers,

Till

 

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <[hidden email]> wrote:

Hello Till & Guowei,

 

Thanks for the replies! Here is a snippet of the window function:

 

  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);

 

Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process Time?

The set of keys processed in the stream is stable over time

 

The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction?

 

Thanks!

 

-Matt

 

 

From: Till Rohrmann <[hidden email]>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <[hidden email]>
Cc: "Wissman, Matt" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tumbling windows - increasing checkpoint size over time

 

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you.

 

Hi Matt,

 

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

 

Are you using event time? If yes, how is the watermark generated?

 

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

 

Cheers,

Till

 

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <[hidden email]> wrote:

Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <[hidden email]> 2020527日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt