Non incremental window function accumulates unbounded state with RocksDb

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Non incremental window function accumulates unbounded state with RocksDb

shater93

Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the pseudocode example below it is keyed on the first letter in the string, in our pipeline it is keyed on a predefined key) and processed in sliding windows with a duration of 60m every 10:th minute. The time setting is eventTime and the windows processes the data when the window should fire, there are no incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays within the data size that is expected when the windows have buffered the data (~200 Mb for this application) and is bounded to around this size independent of the lifetime of the processing pipeline. However, if the state backend is changed to the RocksDb backend the states starts to grow indefinitely (is our observation, we haven’t seen it stop growing at least) to 70-80 Gb in just above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown that the state consisted of data from the whole lifetime of the pipeline, of about equal size for each day. I interpret this as the state has accumulated the old data which should’ve been deleted during the clearing of the windows. It is worth noting that the state consists of the input Strings only, so it should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and the base file size as well as the target file size to trigger more compactions in the hope of that the compactions would remove the obsolete data which rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more complex than this and is running on other classes than String input and a “histogram” output class. Do you have any input or ideas how the state could be manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William

 

class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

         //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: Collector[Histogram]) : Unit = {

        out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(400000)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(450000)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(900000)

DataStream<String> stream = env

        .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new Histogram()).name(“Pseudocode").uuid(“Psuedocode”)

 

William Jonsson
Systems Engineer
Fleet Perception for Maintenance
NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden
Mobile: +46 722 178 247
[hidden email]
www.niradynamics.se
Together for smarter safety
Reply | Threaded
Open this post in threaded view
|

Re: Non incremental window function accumulates unbounded state with RocksDb

Yun Tang
Hi William

I don't believe the same job would have 70~80GB state for RocksDB while it's only 200MB for HeapStateBackend even though RocksDB has some space amplification. Are you sure the job received the same input throughput with different state backends and they both run well without any failover? Could you take a savepoint for the job with different state backends and compare the size of the savepoints? What's more, what version of Flink did you use?

Best
Yun Tang

From: William Jonsson <[hidden email]>
Sent: Friday, August 30, 2019 17:04
To: [hidden email] <[hidden email]>
Cc: Fleet Perception for Maintenance <[hidden email]>
Subject: Non incremental window function accumulates unbounded state with RocksDb
 

Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the pseudocode example below it is keyed on the first letter in the string, in our pipeline it is keyed on a predefined key) and processed in sliding windows with a duration of 60m every 10:th minute. The time setting is eventTime and the windows processes the data when the window should fire, there are no incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays within the data size that is expected when the windows have buffered the data (~200 Mb for this application) and is bounded to around this size independent of the lifetime of the processing pipeline. However, if the state backend is changed to the RocksDb backend the states starts to grow indefinitely (is our observation, we haven’t seen it stop growing at least) to 70-80 Gb in just above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown that the state consisted of data from the whole lifetime of the pipeline, of about equal size for each day. I interpret this as the state has accumulated the old data which should’ve been deleted during the clearing of the windows. It is worth noting that the state consists of the input Strings only, so it should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and the base file size as well as the target file size to trigger more compactions in the hope of that the compactions would remove the obsolete data which rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more complex than this and is running on other classes than String input and a “histogram” output class. Do you have any input or ideas how the state could be manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William

 

class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

         //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: Collector[Histogram]) : Unit = {

        out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(400000)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(450000)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(900000)

DataStream<String> stream = env

        .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new Histogram()).name(“Pseudocode").uuid(“Psuedocode”)

 

William Jonsson
Systems Engineer
Fleet Perception for Maintenance
NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden
Mobile: +46 722 178 247
[hidden email]
www.niradynamics.se
Together for smarter safety
Reply | Threaded
Open this post in threaded view
|

Re: Non incremental window function accumulates unbounded state with RocksDb

shater93

Thanks for your answer Yun.

 

I agree, I don’t believe that either, however that’s my empirical observation. Those statistics are from save points. Basically the jobs are running towards a production kafka so no, not exactly the same input. However, these statistics are from several runs distributed in time so they should not contain temporal effects. There are no failovers in the pipeline during runtime. By doing some calculations on the size and the pace of the data in the pipeline (how often we receive data and how big the datatype is) yields that the buffered data in the windows should be around a little less than 200Mb, so the HeapBackend behaves accordingly. I agree, the space amplification can’t be a factor of 400 and still continue growing for the RocksDb. I’ve spent some time trying to figure this out, if we are doing anything obscure , but I cant find anything. So it would be interesting if anyone have the same experience as I have.

 

The pipeline is currently running on Flink 1.7.2

 

Best regards and wish you a pleasant day,

William

 

From: Yun Tang <[hidden email]>
Date: Friday, 30 August 2019 at 11:42
To: William Jonsson <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: Fleet Perception for Maintenance <[hidden email]>
Subject: Re: Non incremental window function accumulates unbounded state with RocksDb

 

Hi William

 

I don't believe the same job would have 70~80GB state for RocksDB while it's only 200MB for HeapStateBackend even though RocksDB has some space amplification. Are you sure the job received the same input throughput with different state backends and they both run well without any failover? Could you take a savepoint for the job with different state backends and compare the size of the savepoints? What's more, what version of Flink did you use?

 

Best

Yun Tang


From: William Jonsson <[hidden email]>
Sent: Friday, August 30, 2019 17:04
To: [hidden email] <[hidden email]>
Cc: Fleet Perception for Maintenance <[hidden email]>
Subject: Non incremental window function accumulates unbounded state with RocksDb

 

Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the pseudocode example below it is keyed on the first letter in the string, in our pipeline it is keyed on a predefined key) and processed in sliding windows with a duration of 60m every 10:th minute. The time setting is eventTime and the windows processes the data when the window should fire, there are no incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays within the data size that is expected when the windows have buffered the data (~200 Mb for this application) and is bounded to around this size independent of the lifetime of the processing pipeline. However, if the state backend is changed to the RocksDb backend the states starts to grow indefinitely (is our observation, we haven’t seen it stop growing at least) to 70-80 Gb in just above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown that the state consisted of data from the whole lifetime of the pipeline, of about equal size for each day. I interpret this as the state has accumulated the old data which should’ve been deleted during the clearing of the windows. It is worth noting that the state consists of the input Strings only, so it should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and the base file size as well as the target file size to trigger more compactions in the hope of that the compactions would remove the obsolete data which rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more complex than this and is running on other classes than String input and a “histogram” output class. Do you have any input or ideas how the state could be manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William

 

class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

         //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: Collector[Histogram]) : Unit = {

        out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(400000)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(450000)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(900000)

DataStream<String> stream = env

        .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new Histogram()).name(“Pseudocode").uuid(“Psuedocode”)

 

William Jonsson
Systems Engineer
Fleet Perception for Maintenance

cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png

NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden

Mobile: +46 722 178 247
[hidden email]
www.niradynamics.se

Together for smarter safety

 

Reply | Threaded
Open this post in threaded view
|

Re: Non incremental window function accumulates unbounded state with RocksDb

Yun Tang
Hi William

I think there might be another possible cause. Since RocksDB would perform 10X less than heap state backend. Have you ever checked current watermark of the job (from web UI) to see whether window triggered as expected, and whether the rocksDB job behaves back pressured? If state have been stayed in the window but not triggered, we might meet larger state. (However, it seems still cannot be acted a 400 factor larger)

Best
Yun Tang

From: William Jonsson <[hidden email]>
Sent: Friday, August 30, 2019 18:22
To: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Non incremental window function accumulates unbounded state with RocksDb
 

Thanks for your answer Yun.

 

I agree, I don’t believe that either, however that’s my empirical observation. Those statistics are from save points. Basically the jobs are running towards a production kafka so no, not exactly the same input. However, these statistics are from several runs distributed in time so they should not contain temporal effects. There are no failovers in the pipeline during runtime. By doing some calculations on the size and the pace of the data in the pipeline (how often we receive data and how big the datatype is) yields that the buffered data in the windows should be around a little less than 200Mb, so the HeapBackend behaves accordingly. I agree, the space amplification can’t be a factor of 400 and still continue growing for the RocksDb. I’ve spent some time trying to figure this out, if we are doing anything obscure , but I cant find anything. So it would be interesting if anyone have the same experience as I have.

 

The pipeline is currently running on Flink 1.7.2

 

Best regards and wish you a pleasant day,

William

 

From: Yun Tang <[hidden email]>
Date: Friday, 30 August 2019 at 11:42
To: William Jonsson <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: Fleet Perception for Maintenance <[hidden email]>
Subject: Re: Non incremental window function accumulates unbounded state with RocksDb

 

Hi William

 

I don't believe the same job would have 70~80GB state for RocksDB while it's only 200MB for HeapStateBackend even though RocksDB has some space amplification. Are you sure the job received the same input throughput with different state backends and they both run well without any failover? Could you take a savepoint for the job with different state backends and compare the size of the savepoints? What's more, what version of Flink did you use?

 

Best

Yun Tang


From: William Jonsson <[hidden email]>
Sent: Friday, August 30, 2019 17:04
To: [hidden email] <[hidden email]>
Cc: Fleet Perception for Maintenance <[hidden email]>
Subject: Non incremental window function accumulates unbounded state with RocksDb

 

Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the pseudocode example below it is keyed on the first letter in the string, in our pipeline it is keyed on a predefined key) and processed in sliding windows with a duration of 60m every 10:th minute. The time setting is eventTime and the windows processes the data when the window should fire, there are no incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays within the data size that is expected when the windows have buffered the data (~200 Mb for this application) and is bounded to around this size independent of the lifetime of the processing pipeline. However, if the state backend is changed to the RocksDb backend the states starts to grow indefinitely (is our observation, we haven’t seen it stop growing at least) to 70-80 Gb in just above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown that the state consisted of data from the whole lifetime of the pipeline, of about equal size for each day. I interpret this as the state has accumulated the old data which should’ve been deleted during the clearing of the windows. It is worth noting that the state consists of the input Strings only, so it should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and the base file size as well as the target file size to trigger more compactions in the hope of that the compactions would remove the obsolete data which rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more complex than this and is running on other classes than String input and a “histogram” output class. Do you have any input or ideas how the state could be manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William

 

class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

         //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: Collector[Histogram]) : Unit = {

        out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(400000)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(450000)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(900000)

DataStream<String> stream = env

        .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new Histogram()).name(“Pseudocode").uuid(“Psuedocode”)

 

William Jonsson
Systems Engineer
Fleet Perception for Maintenance

cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png

NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden

Mobile: +46 722 178 247
[hidden email]
www.niradynamics.se

Together for smarter safety