List State in RichWindowFunction leads to RocksDb memory leak

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

List State in RichWindowFunction leads to RocksDb memory leak

swiesman

I am working on a program that uses a complex window and have run into some issues. It is a 1 hour window with 7 days allowed lateness including a custom trigger that gives us intermediate results every 5 minutes of processing time until the end of 7 days event time when a final fire is triggered and the window is purged. The window functions are an incremental reduce function as well as a RichWindowFunction which performs some final computation before outputting each result. I am building up a collection of objects so each time the RichWindowFunction is run I want to take a diff with the previous set to only output elements that have changed.  

 

Example:

 

//In reality I am working with more complex objects than ints.

class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, Key, TimeWindow] {

                @transient var state: ListState[Int]= _

 

                override def open(parameters: Configuration): Unit = {

                val info = new ListStateDescriptor(“previous”, createTypeInformation[Int])

                state = getRuntimeContext.getListState(info)

}

 

override def apply(key: Key, window: TimeWindow, input: Iterable[Collection[Int]], out: Collector[Int]): Unit = {

                val current = input.iterator.next

                val previous = state.get().iterator.asScala.toSet

                previous.clear()

 

for (elem <- current) {
                if (!previous.contains(elem)) {

                out.collect(elem)

}

 

state.add(elem) //store for the next run

}

}

}  

 

The issue with this is that it causes a memory leak with RocksDb. When the WindowOperator executes clearAllState at the end of the windows lifetime it does not clear the ListState or any other type of custom partitioned state that may have been created. This causes my state size to grow indefinitely. It appears to me that a RichWindowFunction should have a clear method, similar to triggers, for cleaning up state when the window is destroyed.

 

Barring that I can envision two ways of solving this problem but have come short of successfully implementing them.

 

1)       If I had access to the watermark from within apply I could use that in conjuction with the TimeWindow passed in and be able to tell if it was my final EventTimeTimer that had gone off allowing me to manually clear the state:

 

ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {

                                state.add(elem) // I know that my window is not finished so I can store state.  

                     }

 

2)       Pass my elements into a second window with a count trigger of 1 and a custom evictor which always keeps the two most recent elements and then do my diff there.

Semantically this seems to work but in practice it causes my checkpoint times to grow 10x and I seem to fail every 5th-7th checkpoint.

 

I am curious if anyone here has any ideas of what I might be able to do to solve this problem.

 

Thank you,

 

Seth Wiesman

Reply | Threaded
Open this post in threaded view
|

Re: List State in RichWindowFunction leads to RocksDb memory leak

Aljoscha Krettek
Hi Seth,
yes, this is a thorny problem but I actually see one additional possible solution (that will, however, break other possible use cases.

First, regarding your solution 1):
We are working on adding this for ProcessWindowFunction: https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction is a more powerful interface that allows querying more context about a window firing. This will replace the current WindowFunction in the future. Unfortunately this doesn't help you with your current situation.

About 2), do you have any idea why the state is getting so big? Do you see the state of the second (count) window operator growing very large? The problem with count windows is that they never get garbage collected if you don't reach the count required by a Trigger. If you have an evolving key space this means that your state will possibly grow forever.

The third solution that I can think of is to make state of a window function implicitly scoped to both the key and window. Right now, state is "global" across time and only scoped to a key. If we also scoped to the window we could keep track of all state created for a window and then garbage collect that once the window expires. This, however, will break things for people that rely on this state being global. I'll bring this up on the dev mailing list to see what people think about it? Are you also following that one? So that you could chime in.

I'm afraid I don't have a good solution for you before Flink 1.3 come out, other than writing your own custom operator or copying the WindowOperator.

What do you think?

Best,
Aljoscha

On Thu, 23 Feb 2017 at 16:12 Seth Wiesman <[hidden email]> wrote:

I am working on a program that uses a complex window and have run into some issues. It is a 1 hour window with 7 days allowed lateness including a custom trigger that gives us intermediate results every 5 minutes of processing time until the end of 7 days event time when a final fire is triggered and the window is purged. The window functions are an incremental reduce function as well as a RichWindowFunction which performs some final computation before outputting each result. I am building up a collection of objects so each time the RichWindowFunction is run I want to take a diff with the previous set to only output elements that have changed.  

 

Example:

 

//In reality I am working with more complex objects than ints.

class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, Key, TimeWindow] {

                @transient var state: ListState[Int]= _

 

                override def open(parameters: Configuration): Unit = {

                val info = new ListStateDescriptor(“previous”, createTypeInformation[Int])

                state = getRuntimeContext.getListState(info)

}

 

override def apply(key: Key, window: TimeWindow, input: Iterable[Collection[Int]], out: Collector[Int]): Unit = {

                val current = input.iterator.next

                val previous = state.get().iterator.asScala.toSet

                previous.clear()

 

for (elem <- current) {
                if (!previous.contains(elem)) {

                out.collect(elem)

}

 

state.add(elem) //store for the next run

}

}

}  

 

The issue with this is that it causes a memory leak with RocksDb. When the WindowOperator executes clearAllState at the end of the windows lifetime it does not clear the ListState or any other type of custom partitioned state that may have been created. This causes my state size to grow indefinitely. It appears to me that a RichWindowFunction should have a clear method, similar to triggers, for cleaning up state when the window is destroyed.

 

Barring that I can envision two ways of solving this problem but have come short of successfully implementing them.

 

1)       If I had access to the watermark from within apply I could use that in conjuction with the TimeWindow passed in and be able to tell if it was my final EventTimeTimer that had gone off allowing me to manually clear the state:

 

ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {

                                state.add(elem) // I know that my window is not finished so I can store state.  

                     }

 

2)       Pass my elements into a second window with a count trigger of 1 and a custom evictor which always keeps the two most recent elements and then do my diff there.

Semantically this seems to work but in practice it causes my checkpoint times to grow 10x and I seem to fail every 5th-7th checkpoint.

 

I am curious if anyone here has any ideas of what I might be able to do to solve this problem.

 

Thank you,

 

Seth Wiesman

Reply | Threaded
Open this post in threaded view
|

Re: List State in RichWindowFunction leads to RocksDb memory leak

swiesman

Appreciate you getting back to me.

 

ProcessWindowFunction does look interesting and expect that it will be what I move to in the future. However, even if it did currently have the functionality that I need today I don’t think I would be comfortable moving to a snapshot version so soon after migrating to 1.2.

 

With the count window: I was actually using a time window with a count trigger (stream.timeWindow().allowedLateness().trigger(Count.of(1))). The issue appeared to have less to do with state size expanding and more to do with checkpoint buffers being blocked somewhere along the pipeline. I decided to move away from this idea shortly after sending my last email so I don’t have any real insight into what was wrong.

 

I understand not wanting to break things for people who expect state to be global and do not expect to see any api’s change J.

 

The solution I ended up setting on was copying the window operator and giving the window function access to the trigger context; luckily it was a fairly trivial change to make. With that I am able to keep everything scoped to the correct namespace and clean everything up when the window is discarded. Is the plan for context in ProcessWindowFunction eventually have access to scoped partitioned state or just timing? There are several things I have coming down the pipeline that require coordination between window evaluations.

 

Thank you again for all the help.

 

Seth Wiesman

 

 

From: Aljoscha Krettek <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, February 24, 2017 at 12:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

 

Hi Seth,

yes, this is a thorny problem but I actually see one additional possible solution (that will, however, break other possible use cases.

 

First, regarding your solution 1):

We are working on adding this for ProcessWindowFunction: https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction is a more powerful interface that allows querying more context about a window firing. This will replace the current WindowFunction in the future. Unfortunately this doesn't help you with your current situation.

 

About 2), do you have any idea why the state is getting so big? Do you see the state of the second (count) window operator growing very large? The problem with count windows is that they never get garbage collected if you don't reach the count required by a Trigger. If you have an evolving key space this means that your state will possibly grow forever.

 

The third solution that I can think of is to make state of a window function implicitly scoped to both the key and window. Right now, state is "global" across time and only scoped to a key. If we also scoped to the window we could keep track of all state created for a window and then garbage collect that once the window expires. This, however, will break things for people that rely on this state being global. I'll bring this up on the dev mailing list to see what people think about it? Are you also following that one? So that you could chime in.

 

I'm afraid I don't have a good solution for you before Flink 1.3 come out, other than writing your own custom operator or copying the WindowOperator.

 

What do you think?

 

Best,

Aljoscha

On Thu, 23 Feb 2017 at 16:12 Seth Wiesman <[hidden email]> wrote:

I am working on a program that uses a complex window and have run into some issues. It is a 1 hour window with 7 days allowed lateness including a custom trigger that gives us intermediate results every 5 minutes of processing time until the end of 7 days event time when a final fire is triggered and the window is purged. The window functions are an incremental reduce function as well as a RichWindowFunction which performs some final computation before outputting each result. I am building up a collection of objects so each time the RichWindowFunction is run I want to take a diff with the previous set to only output elements that have changed.  

 

Example:

 

//In reality I am working with more complex objects than ints.

class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, Key, TimeWindow] {

                @transient var state: ListState[Int]= _

 

                override def open(parameters: Configuration): Unit = {

                val info = new ListStateDescriptor(“previous”, createTypeInformation[Int])

                state = getRuntimeContext.getListState(info)

}

 

override def apply(key: Key, window: TimeWindow, input: Iterable[Collection[Int]], out: Collector[Int]): Unit = {

                val current = input.iterator.next

                val previous = state.get().iterator.asScala.toSet

                previous.clear()

 

for (elem <- current) {
                if (!previous.contains(elem)) {

                out.collect(elem)

}

 

state.add(elem) //store for the next run

}

}

}  

 

The issue with this is that it causes a memory leak with RocksDb. When the WindowOperator executes clearAllState at the end of the windows lifetime it does not clear the ListState or any other type of custom partitioned state that may have been created. This causes my state size to grow indefinitely. It appears to me that a RichWindowFunction should have a clear method, similar to triggers, for cleaning up state when the window is destroyed.

 

Barring that I can envision two ways of solving this problem but have come short of successfully implementing them.

 

1)       If I had access to the watermark from within apply I could use that in conjuction with the TimeWindow passed in and be able to tell if it was my final EventTimeTimer that had gone off allowing me to manually clear the state:

 

ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {

                                state.add(elem) // I know that my window is not finished so I can store state.  

                     }

 

2)       Pass my elements into a second window with a count trigger of 1 and a custom evictor which always keeps the two most recent elements and then do my diff there.

Semantically this seems to work but in practice it causes my checkpoint times to grow 10x and I seem to fail every 5th-7th checkpoint.

 

I am curious if anyone here has any ideas of what I might be able to do to solve this problem.

 

Thank you,

 

Seth Wiesman

Reply | Threaded
Open this post in threaded view
|

Re: List State in RichWindowFunction leads to RocksDb memory leak

swiesman

Also while I’ve got you, is it possible to get the job id from the runtime context?

 

Seth Wiesman

 

From: Seth Wiesman <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, February 24, 2017 at 2:51 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

 

Appreciate you getting back to me.

 

ProcessWindowFunction does look interesting and expect that it will be what I move to in the future. However, even if it did currently have the functionality that I need today I don’t think I would be comfortable moving to a snapshot version so soon after migrating to 1.2.

 

With the count window: I was actually using a time window with a count trigger (stream.timeWindow().allowedLateness().trigger(Count.of(1))). The issue appeared to have less to do with state size expanding and more to do with checkpoint buffers being blocked somewhere along the pipeline. I decided to move away from this idea shortly after sending my last email so I don’t have any real insight into what was wrong.

 

I understand not wanting to break things for people who expect state to be global and do not expect to see any api’s change J.

 

The solution I ended up setting on was copying the window operator and giving the window function access to the trigger context; luckily it was a fairly trivial change to make. With that I am able to keep everything scoped to the correct namespace and clean everything up when the window is discarded. Is the plan for context in ProcessWindowFunction eventually have access to scoped partitioned state or just timing? There are several things I have coming down the pipeline that require coordination between window evaluations.

 

Thank you again for all the help.

 

Seth Wiesman

 

 

From: Aljoscha Krettek <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, February 24, 2017 at 12:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

 

Hi Seth,

yes, this is a thorny problem but I actually see one additional possible solution (that will, however, break other possible use cases.

 

First, regarding your solution 1):

We are working on adding this for ProcessWindowFunction: https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction is a more powerful interface that allows querying more context about a window firing. This will replace the current WindowFunction in the future. Unfortunately this doesn't help you with your current situation.

 

About 2), do you have any idea why the state is getting so big? Do you see the state of the second (count) window operator growing very large? The problem with count windows is that they never get garbage collected if you don't reach the count required by a Trigger. If you have an evolving key space this means that your state will possibly grow forever.

 

The third solution that I can think of is to make state of a window function implicitly scoped to both the key and window. Right now, state is "global" across time and only scoped to a key. If we also scoped to the window we could keep track of all state created for a window and then garbage collect that once the window expires. This, however, will break things for people that rely on this state being global. I'll bring this up on the dev mailing list to see what people think about it? Are you also following that one? So that you could chime in.

 

I'm afraid I don't have a good solution for you before Flink 1.3 come out, other than writing your own custom operator or copying the WindowOperator.

 

What do you think?

 

Best,

Aljoscha

On Thu, 23 Feb 2017 at 16:12 Seth Wiesman <[hidden email]> wrote:

I am working on a program that uses a complex window and have run into some issues. It is a 1 hour window with 7 days allowed lateness including a custom trigger that gives us intermediate results every 5 minutes of processing time until the end of 7 days event time when a final fire is triggered and the window is purged. The window functions are an incremental reduce function as well as a RichWindowFunction which performs some final computation before outputting each result. I am building up a collection of objects so each time the RichWindowFunction is run I want to take a diff with the previous set to only output elements that have changed.  

 

Example:

 

//In reality I am working with more complex objects than ints.

class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, Key, TimeWindow] {

                @transient var state: ListState[Int]= _

 

                override def open(parameters: Configuration): Unit = {

                val info = new ListStateDescriptor(“previous”, createTypeInformation[Int])

                state = getRuntimeContext.getListState(info)

}

 

override def apply(key: Key, window: TimeWindow, input: Iterable[Collection[Int]], out: Collector[Int]): Unit = {

                val current = input.iterator.next

                val previous = state.get().iterator.asScala.toSet

                previous.clear()

 

for (elem <- current) {
                if (!previous.contains(elem)) {

                out.collect(elem)

}

 

state.add(elem) //store for the next run

}

}

}  

 

The issue with this is that it causes a memory leak with RocksDb. When the WindowOperator executes clearAllState at the end of the windows lifetime it does not clear the ListState or any other type of custom partitioned state that may have been created. This causes my state size to grow indefinitely. It appears to me that a RichWindowFunction should have a clear method, similar to triggers, for cleaning up state when the window is destroyed.

 

Barring that I can envision two ways of solving this problem but have come short of successfully implementing them.

 

1)       If I had access to the watermark from within apply I could use that in conjuction with the TimeWindow passed in and be able to tell if it was my final EventTimeTimer that had gone off allowing me to manually clear the state:

 

ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {

                                state.add(elem) // I know that my window is not finished so I can store state.  

                     }

 

2)       Pass my elements into a second window with a count trigger of 1 and a custom evictor which always keeps the two most recent elements and then do my diff there.

Semantically this seems to work but in practice it causes my checkpoint times to grow 10x and I seem to fail every 5th-7th checkpoint.

 

I am curious if anyone here has any ideas of what I might be able to do to solve this problem.

 

Thank you,

 

Seth Wiesman

Reply | Threaded
Open this post in threaded view
|

Re: List State in RichWindowFunction leads to RocksDb memory leak

Aljoscha Krettek
I created this Jira issue for per-window state: https://issues.apache.org/jira/browse/FLINK-5929 Would that suit your needs? If yes, please comment on the issue. I think it would be a very nice addition that opens up a lot of possibilities.

Regarding access to the job id in the RuntimeContext, I think that's not possible right now. Please open an issue for that if you like. It will look better coming from a user, I think. :-)

Best,
Aljoscha

On Fri, 24 Feb 2017 at 21:01 Seth Wiesman <[hidden email]> wrote:

Also while I’ve got you, is it possible to get the job id from the runtime context?

 

Seth Wiesman

 

From: Seth Wiesman <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, February 24, 2017 at 2:51 PM


To: "[hidden email]" <[hidden email]>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

 

Appreciate you getting back to me.

 

ProcessWindowFunction does look interesting and expect that it will be what I move to in the future. However, even if it did currently have the functionality that I need today I don’t think I would be comfortable moving to a snapshot version so soon after migrating to 1.2.

 

With the count window: I was actually using a time window with a count trigger (stream.timeWindow().allowedLateness().trigger(Count.of(1))). The issue appeared to have less to do with state size expanding and more to do with checkpoint buffers being blocked somewhere along the pipeline. I decided to move away from this idea shortly after sending my last email so I don’t have any real insight into what was wrong.

 

I understand not wanting to break things for people who expect state to be global and do not expect to see any api’s change J.

 

The solution I ended up setting on was copying the window operator and giving the window function access to the trigger context; luckily it was a fairly trivial change to make. With that I am able to keep everything scoped to the correct namespace and clean everything up when the window is discarded. Is the plan for context in ProcessWindowFunction eventually have access to scoped partitioned state or just timing? There are several things I have coming down the pipeline that require coordination between window evaluations.

 

Thank you again for all the help.

 

Seth Wiesman

 

 

From: Aljoscha Krettek <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, February 24, 2017 at 12:09 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

 

Hi Seth,

yes, this is a thorny problem but I actually see one additional possible solution (that will, however, break other possible use cases.

 

First, regarding your solution 1):

We are working on adding this for ProcessWindowFunction: https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction is a more powerful interface that allows querying more context about a window firing. This will replace the current WindowFunction in the future. Unfortunately this doesn't help you with your current situation.

 

About 2), do you have any idea why the state is getting so big? Do you see the state of the second (count) window operator growing very large? The problem with count windows is that they never get garbage collected if you don't reach the count required by a Trigger. If you have an evolving key space this means that your state will possibly grow forever.

 

The third solution that I can think of is to make state of a window function implicitly scoped to both the key and window. Right now, state is "global" across time and only scoped to a key. If we also scoped to the window we could keep track of all state created for a window and then garbage collect that once the window expires. This, however, will break things for people that rely on this state being global. I'll bring this up on the dev mailing list to see what people think about it? Are you also following that one? So that you could chime in.

 

I'm afraid I don't have a good solution for you before Flink 1.3 come out, other than writing your own custom operator or copying the WindowOperator.

 

What do you think?

 

Best,

Aljoscha

On Thu, 23 Feb 2017 at 16:12 Seth Wiesman <[hidden email]> wrote:

I am working on a program that uses a complex window and have run into some issues. It is a 1 hour window with 7 days allowed lateness including a custom trigger that gives us intermediate results every 5 minutes of processing time until the end of 7 days event time when a final fire is triggered and the window is purged. The window functions are an incremental reduce function as well as a RichWindowFunction which performs some final computation before outputting each result. I am building up a collection of objects so each time the RichWindowFunction is run I want to take a diff with the previous set to only output elements that have changed.  

 

Example:

 

//In reality I am working with more complex objects than ints.

class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, Key, TimeWindow] {

                @transient var state: ListState[Int]= _

 

                override def open(parameters: Configuration): Unit = {

                val info = new ListStateDescriptor(“previous”, createTypeInformation[Int])

                state = getRuntimeContext.getListState(info)

}

 

override def apply(key: Key, window: TimeWindow, input: Iterable[Collection[Int]], out: Collector[Int]): Unit = {

                val current = input.iterator.next

                val previous = state.get().iterator.asScala.toSet

                previous.clear()

 

for (elem <- current) {
                if (!previous.contains(elem)) {

                out.collect(elem)

}

 

state.add(elem) //store for the next run

}

}

}  

 

The issue with this is that it causes a memory leak with RocksDb. When the WindowOperator executes clearAllState at the end of the windows lifetime it does not clear the ListState or any other type of custom partitioned state that may have been created. This causes my state size to grow indefinitely. It appears to me that a RichWindowFunction should have a clear method, similar to triggers, for cleaning up state when the window is destroyed.

 

Barring that I can envision two ways of solving this problem but have come short of successfully implementing them.

 

1)       If I had access to the watermark from within apply I could use that in conjuction with the TimeWindow passed in and be able to tell if it was my final EventTimeTimer that had gone off allowing me to manually clear the state:

 

ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {

                                state.add(elem) // I know that my window is not finished so I can store state.  

                     }

 

2)       Pass my elements into a second window with a count trigger of 1 and a custom evictor which always keeps the two most recent elements and then do my diff there.

Semantically this seems to work but in practice it causes my checkpoint times to grow 10x and I seem to fail every 5th-7th checkpoint.

 

I am curious if anyone here has any ideas of what I might be able to do to solve this problem.

 

Thank you,

 

Seth Wiesman