Clean GlobalWidnow state

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Clean GlobalWidnow state

gerardg
Hi,

I have the following operator:

mainStream
      .coGroup(coStream)
      .where(_.uuid).equalTo(_.uuid)
      .window(GlobalWindows.create())
      .trigger(triggerWhenAllReceived)
      .apply(mergeElements)

TLDR; It seems that the checkpointed state of the operator keeps growing
forever even if I clear the state and purge the buffered elements using a
processing time trigger.

Details:

Basically I have a main stream that gets elements from another stream and
when it has received all the elements that have been waiting for it outputs
a new element that has been created using the information of all the
received elements.

To do so I use a GlobalWindow and a custom trigger. The custom trigger has
as state two counters, the elements that it has to receive (extracted from
the element received from the main stream) and the elements that it has
received so far from the other stream. When the two counters have the same
value I use the FIRE_AND_PURGE trigger to output all the elements in the
pane (I understand that each set of elements is stored in a pane defined by
the global window and the UUID key).

To cleanup the state (and to not keep elements waiting forever) I setup a
processing time timer which basically clears the state and outputs
FIRE_AND_PURGE to remove the buffered elements.

I must be missing something because the checkpointed state keeps growing
forever so I suspect that the pane is not completely removed.

Gerard




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

Aljoscha Krettek
Hi,

Could you maybe show the code of your trigger?

Best,
Aljoscha

> On 15. Sep 2017, at 11:39, gerardg <[hidden email]> wrote:
>
> Hi,
>
> I have the following operator:
>
> mainStream
>      .coGroup(coStream)
>      .where(_.uuid).equalTo(_.uuid)
>      .window(GlobalWindows.create())
>      .trigger(triggerWhenAllReceived)
>      .apply(mergeElements)
>
> TLDR; It seems that the checkpointed state of the operator keeps growing
> forever even if I clear the state and purge the buffered elements using a
> processing time trigger.
>
> Details:
>
> Basically I have a main stream that gets elements from another stream and
> when it has received all the elements that have been waiting for it outputs
> a new element that has been created using the information of all the
> received elements.
>
> To do so I use a GlobalWindow and a custom trigger. The custom trigger has
> as state two counters, the elements that it has to receive (extracted from
> the element received from the main stream) and the elements that it has
> received so far from the other stream. When the two counters have the same
> value I use the FIRE_AND_PURGE trigger to output all the elements in the
> pane (I understand that each set of elements is stored in a pane defined by
> the global window and the UUID key).
>
> To cleanup the state (and to not keep elements waiting forever) I setup a
> processing time timer which basically clears the state and outputs
> FIRE_AND_PURGE to remove the buffered elements.
>
> I must be missing something because the checkpointed state keeps growing
> forever so I suspect that the pane is not completely removed.
>
> Gerard
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

gerardg
Sure:



The application is configured to use processing time.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

Aljoscha Krettek
Sure, but how does the Trigger actually work?

> On 15. Sep 2017, at 12:20, gerardg <[hidden email]> wrote:
>
> Sure:
>
>
>
> The application is configured to use processing time.
>
> Thanks,
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

gerardg
I'm using nabble and seems that it has removed the code between raw tags.
Here it is again:

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{ReducingStateDescriptor,
ValueStateDescriptor}
import
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger,
TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

object TriggerMerge {
  @SerialVersionUID(1L)
  private class Sum extends ReduceFunction[Long] {
    @throws[Exception]
    override def reduce(value1: Long, value2: Long): Long = value1 + value2
  }
}

class TriggerMerge(val timeout: Long) extends
Trigger[TaggedUnion[MainElement, OtherElement], Window]
 with LazyLogging {

  val elementsToReceiveDesc = new
ValueStateDescriptor[Int]("elements-to-receive", classOf[Int])
  val elementsReceivedDesc = new
ReducingStateDescriptor[Long]("elements-received", new TriggerMerge.Sum,
classOf[Long])

  override def onElement(element: TaggedUnion[MainElement, OtherElement],
timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    var elementsToReceive =
Option(ctx.getPartitionedState(elementsToReceiveDesc).value())
    var elementsReceived =
Option(ctx.getPartitionedState(elementsReceivedDesc).get())

    // Update counters
    if (element.getOne != null) {
      elementsToReceive match {
        case Some(_) => logger.error("Received two main elements with the
same UUID.")
        case _ =>
ctx.getPartitionedState(elementsToReceiveDesc).update(element.getOne.elementsToReceive)
      }
    }
    if (element.getTwo != null) {
      ctx.getPartitionedState(elementsReceivedDesc).add(1)
    }

    // Update deadline timeout
    val newDeadline = System.currentTimeMillis + timeout
    ctx.registerProcessingTimeTimer(newDeadline)

    // Get updated values
    elementsToReceive =
Option(ctx.getPartitionedState(elementsToReceiveDesc).value())
    elementsReceived =
Option(ctx.getPartitionedState(elementsReceivedDesc).get())

    // Check if everything is going as it should
    if (elementsToReceive.nonEmpty && elementsReceived.nonEmpty &&
      elementsToReceive.get == elementsReceived.get) {
      TriggerResult.FIRE_AND_PURGE
    } else {
      TriggerResult.CONTINUE
    }
  }

  override def clear(window: Window, ctx: TriggerContext): Unit = {
    // Cleanup state
    ctx.getPartitionedState(elementsToReceiveDesc).clear()
    ctx.getPartitionedState(elementsReceivedDesc).clear()
  }

  override def onProcessingTime(time: Long, window: Window, ctx:
TriggerContext): TriggerResult =  {
    this.clear(window, ctx)
    TriggerResult.FIRE_AND_PURGE
  }

  override def onEventTime(time: Long, window: Window, ctx: TriggerContext):
TriggerResult = TriggerResult.CONTINUE
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

gerardg
I may be able to better know what is happening if I could get what is being
stored in the state. Is there any way to read the RocksDB db state?

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

Fabian Hueske-2
Hi Gerard,

I had a look at your Trigger implementation but did not spot something suspicious that would cause the state size to grow.
However, I notices a few things that can be improved:

- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to make the Trigger easier to test (there some test harnesses that can set the processing time manually)
- timers are not overwritten, so each timeout timer will yield a callback to onProcessingTime(). It is not possible to delete timers (so you cannot prevent the onProcessingTime() method to be called multiple times), but you can save the most recent timer timestamps as ValueState[Long] and compare against the state to only act on the last timer call.
- You can get the state objects just once and apply multiple operations on the state object, i.e.,

var elementsToReceive = ctx.getPartitionedState(elementsToReceiveDesc)
var elementsReceived = ctx.getPartitionedState(elementsReceivedDesc)

elementsToReceive.update(x)
val cnt: Int = elementsToReceive.get()
...

Maybe Aljoscha can check the code as well and see if he finds the reason why the state grows.

Best, Fabian

2017-09-18 15:27 GMT+02:00 gerardg <[hidden email]>:
I may be able to better know what is happening if I could get what is being
stored in the state. Is there any way to read the RocksDB db state?

Gerard

Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

gerardg
Thanks Fabian, I'll take a look to these improvements.

I was wondering if the increasing state size could be due to that the UUID
used in the keyBy are randomly generated. Maybe even if I correctly delete
all the state related to a given key there is still some metadata related to
the key wandering around.

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

Fabian Hueske-2
If this would be the case, that would be a bug in Flink.
As I said before, your implementation looked good to me.
All state of window and trigger should be wiped if the trigger returns FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented.

I'll CC Aljoscha again for his opinion.
We might need to file a JIRA for the issue.

Thanks,
Fabian

2017-09-19 11:32 GMT+02:00 gerardg <[hidden email]>:
Thanks Fabian, I'll take a look to these improvements.

I was wondering if the increasing state size could be due to that the UUID
used in the keyBy are randomly generated. Maybe even if I correctly delete
all the state related to a given key there is still some metadata related to
the key wandering around.

Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

Aljoscha Krettek
Hi,

Are the UUIDs randomly generated when calling .uuid or are they assigned and then .uuid will return the same UUID when calling multiple times? The latter would be problematic because we would not correctly assign state.

Best,
Aljoscha
On 19. Sep 2017, at 11:41, Fabian Hueske <[hidden email]> wrote:

If this would be the case, that would be a bug in Flink.
As I said before, your implementation looked good to me.
All state of window and trigger should be wiped if the trigger returns FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented.

I'll CC Aljoscha again for his opinion.
We might need to file a JIRA for the issue.

Thanks,
Fabian

2017-09-19 11:32 GMT+02:00 gerardg <[hidden email]>:
Thanks Fabian, I'll take a look to these improvements.

I was wondering if the increasing state size could be due to that the UUID
used in the keyBy are randomly generated. Maybe even if I correctly delete
all the state related to a given key there is still some metadata related to
the key wandering around.


Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

gerardg
The UUIDs are assigned.

As far as I can see (inspecting the metrics and how the task behaves) the
mergeElements apply function receives all the elements (the main element and
the other elements that it expects) so it seems that the correlation is
correct. Also, nothing indicates that there are elements lost inside the
window (everything that enters goes out).

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

gerardg
I have prepared a repo that reproduces the issue:
https://github.com/GerardGarcia/flink-global-window-growing-state

Maybe this way it is easier to spot the error or we can determine if it is a
bug.

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Clean GlobalWidnow state

Fabian Hueske-2
Thanks for creating the JIRA issue!

Best, Fabian

2017-09-20 12:26 GMT+02:00 gerardg <[hidden email]>:
I have prepared a repo that reproduces the issue:
https://github.com/GerardGarcia/flink-global-window-growing-state

Maybe this way it is easier to spot the error or we can determine if it is a
bug.