Collapsing watermarks after keyby

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Collapsing watermarks after keyby

Padarn Wilson-2
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn
Reply | Threaded
Open this post in threaded view
|

Re: Collapsing watermarks after keyby

Till Rohrmann
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <[hidden email]> wrote:
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn
Reply | Threaded
Open this post in threaded view
|

Re: Collapsing watermarks after keyby

Padarn Wilson-2
Hi Till,

I will work on an example, but I’m a little confused by how keyBy and watermarks work in this case. This documentation says (): 


Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…)or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.


This implies to me that the keyBy splits the watermark?

On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <[hidden email]> wrote:
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <[hidden email]> wrote:
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn
Reply | Threaded
Open this post in threaded view
|

Re: Collapsing watermarks after keyby

Padarn Wilson-2
Just to add: by printing intermediate results I see that I definitely have more than five minutes of data, and by windowing without the session windows I see that event time watermarks do seem to be generated as expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <[hidden email]> wrote:
Hi Till,

I will work on an example, but I’m a little confused by how keyBy and watermarks work in this case. This documentation says (): 


Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…)or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.


This implies to me that the keyBy splits the watermark?

On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <[hidden email]> wrote:
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <[hidden email]> wrote:
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn
Reply | Threaded
Open this post in threaded view
|

Re: Collapsing watermarks after keyby

Till Rohrmann
Operator's with multiple inputs emit the minimum of the input's watermarks downstream. In case of a keyBy this means that the watermark is sent to all downstream consumers.

Cheers,
Till

On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <[hidden email]> wrote:
Just to add: by printing intermediate results I see that I definitely have more than five minutes of data, and by windowing without the session windows I see that event time watermarks do seem to be generated as expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <[hidden email]> wrote:
Hi Till,

I will work on an example, but I’m a little confused by how keyBy and watermarks work in this case. This documentation says (): 


Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…)or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.


This implies to me that the keyBy splits the watermark?

On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <[hidden email]> wrote:
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <[hidden email]> wrote:
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn
Reply | Threaded
Open this post in threaded view
|

Re: Collapsing watermarks after keyby

Padarn Wilson-2
Okay. I think I still must misunderstand something here. I will work on building a unit test around this, hopefully this clears up my confusion. 

Thank you,
Padarn

On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <[hidden email]> wrote:
Operator's with multiple inputs emit the minimum of the input's watermarks downstream. In case of a keyBy this means that the watermark is sent to all downstream consumers.

Cheers,
Till

On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <[hidden email]> wrote:
Just to add: by printing intermediate results I see that I definitely have more than five minutes of data, and by windowing without the session windows I see that event time watermarks do seem to be generated as expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <[hidden email]> wrote:
Hi Till,

I will work on an example, but I’m a little confused by how keyBy and watermarks work in this case. This documentation says (): 


Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…)or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.


This implies to me that the keyBy splits the watermark?

On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <[hidden email]> wrote:
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <[hidden email]> wrote:
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn
Reply | Threaded
Open this post in threaded view
|

Re: Collapsing watermarks after keyby

Padarn Wilson-2
I created a small test to see if I could replicate this... but I couldn't :-) Below is my code that provides a counter example. It is not very clean, but perhaps it is useful for someone else in the future:


class SessionWindowTest extends FunSuite with Matchers {

test("Should advance watermark correctly") {


val startTime = 0L

val elements1 = List[Tester](
Tester("id1:a", "id2:a", startTime),
Tester("id1:b", "id2:a", startTime+1),
Tester("id1:b", "id2:a", startTime+100),
Tester("id1:a", "id2:a", startTime+1)
)

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.getConfig.disableSysoutLogging()

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Tester] {

override def extractTimestamp(element: Tester, previousElementTimestamp: Long): Long = {
element.time
}
override def checkAndGetNextWatermark(lastElement: Tester, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
}

val stream = streamEnv.addSource(new SourceFunction[Tester]() {
def run(ctx: SourceFunction.SourceContext[Tester]) {
elements1.foreach {
ctx.collect
}
}
override def cancel(): Unit = {}
}).assignTimestampsAndWatermarks(new PunctuatedAssigner)


val sessionsStream = stream
.keyBy(_.id1)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(2)))
.apply(
(key: String, windowInfo, iter: Iterable[Tester], collector: Collector[Tester]) => {
val elements = iter.toList
println("Session window. Elements:", elements)
println(windowInfo)
collector.collect(elements.reverse.head)
})

val countStream = sessionsStream
.keyBy(_.id2)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply(
(key: String, windowInfo, iter: Iterable[Tester], collector: Collector[Tester]) => {
val elements = iter.toList
println("Tumbling window. Elements:", elements, windowInfo.getStart, windowInfo.getEnd)
collector.collect(elements.reverse.head)
})

sessionsStream.print()
countStream.print()

streamEnv.execute()

}

}

On Tue, Feb 26, 2019 at 10:49 PM Padarn Wilson <[hidden email]> wrote:
Okay. I think I still must misunderstand something here. I will work on building a unit test around this, hopefully this clears up my confusion. 

Thank you,
Padarn

On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <[hidden email]> wrote:
Operator's with multiple inputs emit the minimum of the input's watermarks downstream. In case of a keyBy this means that the watermark is sent to all downstream consumers.

Cheers,
Till

On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <[hidden email]> wrote:
Just to add: by printing intermediate results I see that I definitely have more than five minutes of data, and by windowing without the session windows I see that event time watermarks do seem to be generated as expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <[hidden email]> wrote:
Hi Till,

I will work on an example, but I’m a little confused by how keyBy and watermarks work in this case. This documentation says (): 


Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…)or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.


This implies to me that the keyBy splits the watermark?

On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <[hidden email]> wrote:
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always global. Therefore, I would suspect that it is rather a problem with generating watermarks at all. Could it be that your input data does not span a period longer than 5 minutes and also does not terminate? Another problem could be the CountTrigger which should not react to the window's end time. The method onEventTime simply returns TriggerResult.CONTINUE and I think this will cause the window to not fire. Maybe a working example program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <[hidden email]> wrote:
Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator across keys, so that keys with dragging watermarks do not drag behind. Details below:

---

I have an application in which I want to perform the follow sequence of steps: Assume my data is made up of data that has: (time, user, location, action)

-> Read source
-> KeyBy (UserId, Location) 
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given location - the EventTimeSessionWindow is used to make sure users are only counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() == maxCount) {
return TriggerResult.FIRE_AND_PURGE;
} else if (count.get() > maxCount) {
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
} 

But my final SlidingEventTimeWindow does not fire properly. This is because (I assume) there are some users with sessions windows that are not closed, and so the watermark for those keys is running behind and so the SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of the SlidingEventTimeWindow operator to be the maximum (with lateness) of the input keys, rather than the minimum, but I cannot tell if this is possible, and if not, what another approach could be.

Thanks,
Padarn