Hi,
I have the following operator: mainStream .coGroup(coStream) .where(_.uuid).equalTo(_.uuid) .window(GlobalWindows.create()) .trigger(triggerWhenAllReceived) .apply(mergeElements) TLDR; It seems that the checkpointed state of the operator keeps growing forever even if I clear the state and purge the buffered elements using a processing time trigger. Details: Basically I have a main stream that gets elements from another stream and when it has received all the elements that have been waiting for it outputs a new element that has been created using the information of all the received elements. To do so I use a GlobalWindow and a custom trigger. The custom trigger has as state two counters, the elements that it has to receive (extracted from the element received from the main stream) and the elements that it has received so far from the other stream. When the two counters have the same value I use the FIRE_AND_PURGE trigger to output all the elements in the pane (I understand that each set of elements is stored in a pane defined by the global window and the UUID key). To cleanup the state (and to not keep elements waiting forever) I setup a processing time timer which basically clears the state and outputs FIRE_AND_PURGE to remove the buffered elements. I must be missing something because the checkpointed state keeps growing forever so I suspect that the pane is not completely removed. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Could you maybe show the code of your trigger? Best, Aljoscha > On 15. Sep 2017, at 11:39, gerardg <[hidden email]> wrote: > > Hi, > > I have the following operator: > > mainStream > .coGroup(coStream) > .where(_.uuid).equalTo(_.uuid) > .window(GlobalWindows.create()) > .trigger(triggerWhenAllReceived) > .apply(mergeElements) > > TLDR; It seems that the checkpointed state of the operator keeps growing > forever even if I clear the state and purge the buffered elements using a > processing time trigger. > > Details: > > Basically I have a main stream that gets elements from another stream and > when it has received all the elements that have been waiting for it outputs > a new element that has been created using the information of all the > received elements. > > To do so I use a GlobalWindow and a custom trigger. The custom trigger has > as state two counters, the elements that it has to receive (extracted from > the element received from the main stream) and the elements that it has > received so far from the other stream. When the two counters have the same > value I use the FIRE_AND_PURGE trigger to output all the elements in the > pane (I understand that each set of elements is stored in a pane defined by > the global window and the UUID key). > > To cleanup the state (and to not keep elements waiting forever) I setup a > processing time timer which basically clears the state and outputs > FIRE_AND_PURGE to remove the buffered elements. > > I must be missing something because the checkpointed state keeps growing > forever so I suspect that the pane is not completely removed. > > Gerard > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Sure:
The application is configured to use processing time. Thanks, Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Sure, but how does the Trigger actually work?
> On 15. Sep 2017, at 12:20, gerardg <[hidden email]> wrote: > > Sure: > > > > The application is configured to use processing time. > > Thanks, > > Gerard > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I'm using nabble and seems that it has removed the code between raw tags.
Here it is again: import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.{ReducingStateDescriptor, ValueStateDescriptor} import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.Window object TriggerMerge { @SerialVersionUID(1L) private class Sum extends ReduceFunction[Long] { @throws[Exception] override def reduce(value1: Long, value2: Long): Long = value1 + value2 } } class TriggerMerge(val timeout: Long) extends Trigger[TaggedUnion[MainElement, OtherElement], Window] with LazyLogging { val elementsToReceiveDesc = new ValueStateDescriptor[Int]("elements-to-receive", classOf[Int]) val elementsReceivedDesc = new ReducingStateDescriptor[Long]("elements-received", new TriggerMerge.Sum, classOf[Long]) override def onElement(element: TaggedUnion[MainElement, OtherElement], timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = { var elementsToReceive = Option(ctx.getPartitionedState(elementsToReceiveDesc).value()) var elementsReceived = Option(ctx.getPartitionedState(elementsReceivedDesc).get()) // Update counters if (element.getOne != null) { elementsToReceive match { case Some(_) => logger.error("Received two main elements with the same UUID.") case _ => ctx.getPartitionedState(elementsToReceiveDesc).update(element.getOne.elementsToReceive) } } if (element.getTwo != null) { ctx.getPartitionedState(elementsReceivedDesc).add(1) } // Update deadline timeout val newDeadline = System.currentTimeMillis + timeout ctx.registerProcessingTimeTimer(newDeadline) // Get updated values elementsToReceive = Option(ctx.getPartitionedState(elementsToReceiveDesc).value()) elementsReceived = Option(ctx.getPartitionedState(elementsReceivedDesc).get()) // Check if everything is going as it should if (elementsToReceive.nonEmpty && elementsReceived.nonEmpty && elementsToReceive.get == elementsReceived.get) { TriggerResult.FIRE_AND_PURGE } else { TriggerResult.CONTINUE } } override def clear(window: Window, ctx: TriggerContext): Unit = { // Cleanup state ctx.getPartitionedState(elementsToReceiveDesc).clear() ctx.getPartitionedState(elementsReceivedDesc).clear() } override def onProcessingTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = { this.clear(window, ctx) TriggerResult.FIRE_AND_PURGE } override def onEventTime(time: Long, window: Window, ctx: TriggerContext): TriggerResult = TriggerResult.CONTINUE } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I may be able to better know what is happening if I could get what is being
stored in the state. Is there any way to read the RocksDB db state? Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Gerard, I had a look at your Trigger implementation but did not spot something suspicious that would cause the state size to grow.- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to make the Trigger easier to test (there some test harnesses that can set the processing time manually) var elementsToReceive = ctx. var elementsReceived = ctx.
... Maybe Aljoscha can check the code as well and see if he finds the reason why the state grows. Best, Fabian 2017-09-18 15:27 GMT+02:00 gerardg <[hidden email]>: I may be able to better know what is happening if I could get what is being |
Thanks Fabian, I'll take a look to these improvements.
I was wondering if the increasing state size could be due to that the UUID used in the keyBy are randomly generated. Maybe even if I correctly delete all the state related to a given key there is still some metadata related to the key wandering around. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
If this would be the case, that would be a bug in Flink. As I said before, your implementation looked good to me. All state of window and trigger should be wiped if the trigger returns FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented. We might need to file a JIRA for the issue. Thanks, 2017-09-19 11:32 GMT+02:00 gerardg <[hidden email]>: Thanks Fabian, I'll take a look to these improvements. |
Hi,
Are the UUIDs randomly generated when calling .uuid or are they assigned and then .uuid will return the same UUID when calling multiple times? The latter would be problematic because we would not correctly assign state. Best, Aljoscha
|
The UUIDs are assigned.
As far as I can see (inspecting the metrics and how the task behaves) the mergeElements apply function receives all the elements (the main element and the other elements that it expects) so it seems that the correlation is correct. Also, nothing indicates that there are elements lost inside the window (everything that enters goes out). Thanks, Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I have prepared a repo that reproduces the issue:
https://github.com/GerardGarcia/flink-global-window-growing-state Maybe this way it is easier to spot the error or we can determine if it is a bug. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for creating the JIRA issue! Best, Fabian2017-09-20 12:26 GMT+02:00 gerardg <[hidden email]>: I have prepared a repo that reproduces the issue: |
Free forum by Nabble | Edit this page |