watermark trigger doesn't check whether element's timestamp is passed

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

watermark trigger doesn't check whether element's timestamp is passed

Manu Zhang
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Aljoscha Krettek
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Manu Zhang
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Aljoscha Krettek
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Manu Zhang
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Aljoscha Krettek
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Manu Zhang
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Aljoscha Krettek
Ah, I finally understand it. You would a way to query the current watermark in the window function to only emit those elements where the timestamp is lower than the watermark. 

When the window fires again, do you want to emit elements that you emitted during the last firing again? If not, I think you also need to use an evictor to evict the elements from the window where the timestamp is lower than the watermark. With this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we should be able to extend the WindowFunction Context to also provide the current watermark. With this recent PR https://github.com/apache/flink/pull/2736 you would be able to evict elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <[hidden email]> wrote:
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Manu Zhang
Thanks.  The ideal case is to fire after watermark past each element from the window but that requires a custom trigger and FLIP-2 as well. The enhanced window evictor will help to avoid the last firing. 

Are the discussions on FLIP-2 still going on ? 
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction` will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <[hidden email]> wrote:
Ah, I finally understand it. You would a way to query the current watermark in the window function to only emit those elements where the timestamp is lower than the watermark. 

When the window fires again, do you want to emit elements that you emitted during the last firing again? If not, I think you also need to use an evictor to evict the elements from the window where the timestamp is lower than the watermark. With this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we should be able to extend the WindowFunction Context to also provide the current watermark. With this recent PR https://github.com/apache/flink/pull/2736 you would be able to evict elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <[hidden email]> wrote:
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

aj.h

Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to make time for it. There is no implementation yet that I'm aware of, and I'll gladly step aside (or help out how I can) if you or anyone is interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj

On Nov 1, 2016 3:24 PM, "Manu Zhang" <[hidden email]> wrote:
Thanks.  The ideal case is to fire after watermark past each element from the window but that requires a custom trigger and FLIP-2 as well. The enhanced window evictor will help to avoid the last firing. 

Are the discussions on FLIP-2 still going on ? 
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction` will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <[hidden email]> wrote:
Ah, I finally understand it. You would a way to query the current watermark in the window function to only emit those elements where the timestamp is lower than the watermark. 

When the window fires again, do you want to emit elements that you emitted during the last firing again? If not, I think you also need to use an evictor to evict the elements from the window where the timestamp is lower than the watermark. With this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we should be able to extend the WindowFunction Context to also provide the current watermark. With this recent PR https://github.com/apache/flink/pull/2736 you would be able to evict elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <[hidden email]> wrote:
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Aljoscha Krettek
Hi,
a contributor (Bonaventure Del Monte) has started working on this. He should open a Jira this week.

Cheer,
Aljoscha

On Tue, 1 Nov 2016 at 23:57 aj heller <[hidden email]> wrote:

Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to make time for it. There is no implementation yet that I'm aware of, and I'll gladly step aside (or help out how I can) if you or anyone is interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj

On Nov 1, 2016 3:24 PM, "Manu Zhang" <[hidden email]> wrote:
Thanks.  The ideal case is to fire after watermark past each element from the window but that requires a custom trigger and FLIP-2 as well. The enhanced window evictor will help to avoid the last firing. 

Are the discussions on FLIP-2 still going on ? 
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction` will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <[hidden email]> wrote:
Ah, I finally understand it. You would a way to query the current watermark in the window function to only emit those elements where the timestamp is lower than the watermark. 

When the window fires again, do you want to emit elements that you emitted during the last firing again? If not, I think you also need to use an evictor to evict the elements from the window where the timestamp is lower than the watermark. With this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we should be able to extend the WindowFunction Context to also provide the current watermark. With this recent PR https://github.com/apache/flink/pull/2736 you would be able to evict elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <[hidden email]> wrote:
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang



Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Ventura Del Monte
Hello,

I have just opened the JIRA issue and I have almost completed the implementation of this feature. I will keep you posted :)

Cheers,
Ventura



This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain confidential and/or privileged information. If you are not the addressee or authorized to receive this for the addressee, you must not use, copy, disclose or take any action based on this message or any information herein. If you have received this message in error, please advise the sender immediately by reply e-mail and delete this message. Thank you for your cooperation.

On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a contributor (Bonaventure Del Monte) has started working on this. He should open a Jira this week.

Cheer,
Aljoscha

On Tue, 1 Nov 2016 at 23:57 aj heller <[hidden email]> wrote:

Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to make time for it. There is no implementation yet that I'm aware of, and I'll gladly step aside (or help out how I can) if you or anyone is interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj

On Nov 1, 2016 3:24 PM, "Manu Zhang" <[hidden email]> wrote:
Thanks.  The ideal case is to fire after watermark past each element from the window but that requires a custom trigger and FLIP-2 as well. The enhanced window evictor will help to avoid the last firing. 

Are the discussions on FLIP-2 still going on ? 
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction` will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <[hidden email]> wrote:
Ah, I finally understand it. You would a way to query the current watermark in the window function to only emit those elements where the timestamp is lower than the watermark. 

When the window fires again, do you want to emit elements that you emitted during the last firing again? If not, I think you also need to use an evictor to evict the elements from the window where the timestamp is lower than the watermark. With this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we should be able to extend the WindowFunction Context to also provide the current watermark. With this recent PR https://github.com/apache/flink/pull/2736 you would be able to evict elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <[hidden email]> wrote:
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang




Reply | Threaded
Open this post in threaded view
|

Re: watermark trigger doesn't check whether element's timestamp is passed

Manu Zhang
Thanks, that will be great. I'd like to test against my particular use cases once your PR is available.

Manu

On Wed, Nov 2, 2016 at 11:09 PM Ventura Del Monte <[hidden email]> wrote:
Hello,

I have just opened the JIRA issue and I have almost completed the implementation of this feature. I will keep you posted :)

Cheers,
Ventura



This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain confidential and/or privileged information. If you are not the addressee or authorized to receive this for the addressee, you must not use, copy, disclose or take any action based on this message or any information herein. If you have received this message in error, please advise the sender immediately by reply e-mail and delete this message. Thank you for your cooperation.

On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a contributor (Bonaventure Del Monte) has started working on this. He should open a Jira this week.

Cheer,
Aljoscha

On Tue, 1 Nov 2016 at 23:57 aj heller <[hidden email]> wrote:

Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to make time for it. There is no implementation yet that I'm aware of, and I'll gladly step aside (or help out how I can) if you or anyone is interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj

On Nov 1, 2016 3:24 PM, "Manu Zhang" <[hidden email]> wrote:
Thanks.  The ideal case is to fire after watermark past each element from the window but that requires a custom trigger and FLIP-2 as well. The enhanced window evictor will help to avoid the last firing. 

Are the discussions on FLIP-2 still going on ? 
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction` will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <[hidden email]> wrote:
Ah, I finally understand it. You would a way to query the current watermark in the window function to only emit those elements where the timestamp is lower than the watermark. 

When the window fires again, do you want to emit elements that you emitted during the last firing again? If not, I think you also need to use an evictor to evict the elements from the window where the timestamp is lower than the watermark. With this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we should be able to extend the WindowFunction Context to also provide the current watermark. With this recent PR https://github.com/apache/flink/pull/2736 you would be able to evict elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <[hidden email]> wrote:
Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I want to achieve. It will be great if we can query time information in the window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, I don't completely understand what's going on. Could you maybe post an example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <[hidden email]> wrote:
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state (in the window) whose timestamp is after the timer will also be emitted. That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
is that example input/output what you would like to achieve or what you are currently seeing with Flink? I think for your use case a custom Trigger would be required that works like the event-time trigger but additionally registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on page view event when they visit a website.  The input would be like a list of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp - duration). I'm trying SessionWindows with some event time trigger. Note we can't wait for the end of session window due to latency. Instead, we want to emit the user trajectories whenever a buffered PageView's event time is passed by watermark. I tried ContinuousEventTimeTrigger and a custom trigger which sets timer on each element's timestamp. For both triggers I've witnessed a problem like the following (e.g. a session gap of 5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> http://foo/bar", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar -> http://foo/bar/foobar", [1, 10])

The urls in bold should be included since there could be events before them not arrived yet. 


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
with some additional information we might be able to figure this out together. What specific combination of WindowAssigner/Trigger are you using for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <[hidden email]> wrote:
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06)) which is triggered to emit when watermark passes the timestamp of an element. For example, 

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if ("c", 1:06) is processed before watermark(1:04)
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on watermark(1:04). This is incorrect since there could be elements with timestamp between 1:04 and 1:06 that have not arrived yet. 

I guess this is because watermark trigger doesn't check whether element's timestamp has been passed. 

Please correct me if any of the above is not right.

Thanks,
Manu Zhang