ContinuousEventTimeTrigger breaks coGrouped windowed streams?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

ContinuousEventTimeTrigger breaks coGrouped windowed streams?

William Saar-2
Hi!

My topology below seems to work when I comment out all the lines with ContinuousEventTimeTrigger, but prints nothing when the line is in there. Can I coGroup two large time windows that use a different trigger time than the window size? (even if the ContinuousEventTimeTrigger doesn't work for coGroups, I would not expect the result to be completely silent).

The streams I'm cogroupng are from 2 different Kafka sources and uses event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps

DataStream<CommonType> stream1 =
    <stream of event type1>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);
    

DataStream<CommonType> stream2 =
    <stream of event type2>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);

    

stream1.coGroup(stream2).where(...).equalTo(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .print()

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

Gyula Fóra
Hi William,

I am wondering whether the ContinuousEventTimeTrigger is the best choice here (it never cleans up the state as far as I know).

Have you tried the simple SlidingEventTimeWindows as your window function?

Cheers,
Gyula

William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 18:28):
Hi!

My topology below seems to work when I comment out all the lines with ContinuousEventTimeTrigger, but prints nothing when the line is in there. Can I coGroup two large time windows that use a different trigger time than the window size? (even if the ContinuousEventTimeTrigger doesn't work for coGroups, I would not expect the result to be completely silent).

The streams I'm cogroupng are from 2 different Kafka sources and uses event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps

DataStream<CommonType> stream1 =
    <stream of event type1>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);
    

DataStream<CommonType> stream2 =
    <stream of event type2>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);

    

stream1.coGroup(stream2).where(...).equalTo(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .print()

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

William Saar-2
Thanks!

Yes, the SlidingEventTimeWindow works, but is there any way to pre-aggregate things with tumbling windows that emit events more often than their window size? Perhaps I can do this before I merge the streams? (But if ContinuousEventTimeTrigger is the only way to do that, it's bad if it doesn't clean up its state).

I assume using sliding window states will be too large and less efficient than tumbling windows as a sliding fold needs to keep all events in the window and recompute the fold as events slide out of the window, while a tumbling fold just needs to keep the aggregation and can discard events as it folds them.

I am reviewing how one would replace a batch solution based on 3 bucketed aggregations of different window sizes and it seems tumbling windows would be a perfect fit and would need to keep only the 3 aggregations a memory, while sliding windows would need to keep up to 3 copies of all events (for at least the smallest window size) to compute the same type of results.

Hälsningar!
William


----- Original Message -----

To:
<[hidden email]>
Cc:

Sent:
Mon, 21 Nov 2016 08:22:16 +0000
Subject:
Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?


Hi William,

I am wondering whether the ContinuousEventTimeTrigger is the best choice here (it never cleans up the state as far as I know).

Have you tried the simple SlidingEventTimeWindows as your window function?

Cheers,
Gyula

William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 18:28):
Hi!

My topology below seems to work when I comment out all the lines with ContinuousEventTimeTrigger, but prints nothing when the line is in there. Can I coGroup two large time windows that use a different trigger time than the window size? (even if the ContinuousEventTimeTrigger doesn't work for coGroups, I would not expect the result to be completely silent).

The streams I'm cogroupng are from 2 different Kafka sources and uses event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps

DataStream<CommonType> stream1 =
    <stream of event type1>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);
    

DataStream<CommonType> stream2 =
    <stream of event type2>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);

    

stream1.coGroup(stream2).where(...).equalTo(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .print()

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

Gyula Fóra
Hi,

The sliding windows don't have to slide by one event at a time, in essence they are "jumping" windows. It is pretty much like saying I am interested in the computation over the last 2 days, computed every 2 hours or so. This also means that we can start preaggregating for every slide so we don't have to keep all events.

Does this make sense?

Gyula

William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 21., H, 18:52):
Thanks!

Yes, the SlidingEventTimeWindow works, but is there any way to pre-aggregate things with tumbling windows that emit events more often than their window size? Perhaps I can do this before I merge the streams? (But if ContinuousEventTimeTrigger is the only way to do that, it's bad if it doesn't clean up its state).

I assume using sliding window states will be too large and less efficient than tumbling windows as a sliding fold needs to keep all events in the window and recompute the fold as events slide out of the window, while a tumbling fold just needs to keep the aggregation and can discard events as it folds them.

I am reviewing how one would replace a batch solution based on 3 bucketed aggregations of different window sizes and it seems tumbling windows would be a perfect fit and would need to keep only the 3 aggregations a memory, while sliding windows would need to keep up to 3 copies of all events (for at least the smallest window size) to compute the same type of results.

Hälsningar!
William


----- Original Message -----

To:
<[hidden email]>
Cc:

Sent:
Mon, 21 Nov 2016 08:22:16 +0000
Subject:
Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?



Hi William,

I am wondering whether the ContinuousEventTimeTrigger is the best choice here (it never cleans up the state as far as I know).

Have you tried the simple SlidingEventTimeWindows as your window function?

Cheers,
Gyula

William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 18:28):
Hi!

My topology below seems to work when I comment out all the lines with ContinuousEventTimeTrigger, but prints nothing when the line is in there. Can I coGroup two large time windows that use a different trigger time than the window size? (even if the ContinuousEventTimeTrigger doesn't work for coGroups, I would not expect the result to be completely silent).

The streams I'm cogroupng are from 2 different Kafka sources and uses event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps

DataStream<CommonType> stream1 =
    <stream of event type1>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);
    

DataStream<CommonType> stream2 =
    <stream of event type2>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);

    

stream1.coGroup(stream2).where(...).equalTo(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .print()

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

Gyula Fóra
In reply to this post by William Saar-2
Hello,

I answered on the Flink ml, but we can always have a quick skype chat if you want to discuss some details, that's probably easier :)

Gyula

William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo, 18:28):
Hi!

My topology below seems to work when I comment out all the lines with ContinuousEventTimeTrigger, but prints nothing when the line is in there. Can I coGroup two large time windows that use a different trigger time than the window size? (even if the ContinuousEventTimeTrigger doesn't work for coGroups, I would not expect the result to be completely silent).

The streams I'm cogroupng are from 2 different Kafka sources and uses event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps

DataStream<CommonType> stream1 =
    <stream of event type1>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);
    

DataStream<CommonType> stream2 =
    <stream of event type2>
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .fold(...);

    

stream1.coGroup(stream2).where(...).equalTo(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
    .print()

Thanks,

William

Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

Maximilian Michels
In reply to this post by William Saar-2
Hi William,

I've reproduced your example locally for some toy data and everything
was working as expected (with the early triggering). So I'm assuming
either there is something wrong with your input data or the behavior
doesn't always manifest.

Here's the example I run in case you want to try:
https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405

Gyula is right, the ContinuousEventTimeTrigger never purges the window
but that you can circumvent that by extending this trigger and purging
at the end of the window, similarly as done in the EventTimeTrigger.

-Max


On Mon, Nov 21, 2016 at 6:52 PM, William Saar <[hidden email]> wrote:

> Thanks!
>
> Yes, the SlidingEventTimeWindow works, but is there any way to pre-aggregate
> things with tumbling windows that emit events more often than their window
> size? Perhaps I can do this before I merge the streams? (But if
> ContinuousEventTimeTrigger is the only way to do that, it's bad if it
> doesn't clean up its state).
>
> I assume using sliding window states will be too large and less efficient
> than tumbling windows as a sliding fold needs to keep all events in the
> window and recompute the fold as events slide out of the window, while a
> tumbling fold just needs to keep the aggregation and can discard events as
> it folds them.
>
> I am reviewing how one would replace a batch solution based on 3 bucketed
> aggregations of different window sizes and it seems tumbling windows would
> be a perfect fit and would need to keep only the 3 aggregations a memory,
> while sliding windows would need to keep up to 3 copies of all events (for
> at least the smallest window size) to compute the same type of results.
>
> Hälsningar!
> William
>
>
> ----- Original Message -----
> From:
> [hidden email]
>
> To:
> <[hidden email]>
> Cc:
>
> Sent:
> Mon, 21 Nov 2016 08:22:16 +0000
> Subject:
> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>
>
>
> Hi William,
>
> I am wondering whether the ContinuousEventTimeTrigger is the best choice
> here (it never cleans up the state as far as I know).
>
> Have you tried the simple SlidingEventTimeWindows as your window function?
>
> Cheers,
> Gyula
>
> William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo,
> 18:28):
>>
>> Hi!
>>
>> My topology below seems to work when I comment out all the lines with
>> ContinuousEventTimeTrigger, but prints nothing when the line is in there.
>> Can I coGroup two large time windows that use a different trigger time than
>> the window size? (even if the ContinuousEventTimeTrigger doesn't work for
>> coGroups, I would not expect the result to be completely silent).
>>
>> The streams I'm cogroupng are from 2 different Kafka sources and uses
>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps
>>
>> DataStream<CommonType> stream1 =
>>     <stream of event type1>
>>     .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>     .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>     .fold(...);
>>
>>
>> DataStream<CommonType> stream2 =
>>     <stream of event type2>
>>     .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>     .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>     .fold(...);
>>
>>
>>
>> stream1.coGroup(stream2).where(...).equalTo(...)
>>     .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>     .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>     .print()
>>
>> Thanks,
>>
>> William
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

William Saar-2
Thanks!
One difference is that my topology had 2 sources. I have updated your example to also use 2 sources and that breaks the co-group operation in the example as well!

https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed

Nice to know that purging can be added to the event trigger.

William

----- Original Message -----

To:
"[hidden email]" <[hidden email]>
Cc:

Sent:
Tue, 22 Nov 2016 11:50:52 +0100
Subject:
Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?


Hi William,

I've reproduced your example locally for some toy data and everything
was working as expected (with the early triggering). So I'm assuming
either there is something wrong with your input data or the behavior
doesn't always manifest.

Here's the example I run in case you want to try:
https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405

Gyula is right, the ContinuousEventTimeTrigger never purges the window
but that you can circumvent that by extending this trigger and purging
at the end of the window, similarly as done in the EventTimeTrigger.

-Max


On Mon, Nov 21, 2016 at 6:52 PM, William Saar <[hidden email]> wrote:
> Thanks!
>
> Yes, the SlidingEventTimeWindow works, but is there any way to pre-aggregate
> things with tumbling windows that emit events more often than their window
> size? Perhaps I can do this before I merge the streams? (But if
> ContinuousEventTimeTrigger is the only way to do that, it's bad if it
> doesn't clean up its state).
>
> I assume using sliding window states will be too large and less efficient
> than tumbling windows as a sliding fold needs to keep all events in the
> window and recompute the fold as events slide out of the window, while a
> tumbling fold just needs to keep the aggregation and can discard events as
> it folds them.
>
> I am reviewing how one would replace a batch solution based on 3 bucketed
> aggregations of different window sizes and it seems tumbling windows would
> be a perfect fit and would need to keep only the 3 aggregations a memory,
> while sliding windows would need to keep up to 3 copies of all events (for
> at least the smallest window size) to compute the same type of results.
>
> Hälsningar!
> William
>
>
> ----- Original Message -----
> From:
> [hidden email]
>
> To:
> <[hidden email]>
> Cc:
>
> Sent:
> Mon, 21 Nov 2016 08:22:16 +0000
> Subject:
> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>
>
>
> Hi William,
>
> I am wondering whether the ContinuousEventTimeTrigger is the best choice
> here (it never cleans up the state as far as I know).
>
> Have you tried the simple SlidingEventTimeWindows as your window function?
>
> Cheers,
> Gyula
>
> William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo,
> 18:28):
>>
>> Hi!
>>
>> My topology below seems to work when I comment out all the lines with
>> ContinuousEventTimeTrigger, but prints nothing when the line is in there.
>> Can I coGroup two large time windows that use a different trigger time than
>> the window size? (even if the ContinuousEventTimeTrigger doesn't work for
>> coGroups, I would not expect the result to be completely silent).
>>
>> The streams I'm cogroupng are from 2 different Kafka sources and uses
>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps
>>
>> DataStream<CommonType> stream1 =
>> <stream of event type1>
>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>> .fold(...);
>>
>>
>> DataStream<CommonType> stream2 =
>> <stream of event type2>
>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>> .fold(...);
>>
>>
>>
>> stream1.coGroup(stream2).where(...).equalTo(...)
>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>> .print()
>>
>> Thanks,
>>
>> William
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

Maximilian Michels
The problem here is that the ContiuousEventTimeTrigger is kind of
broken. It relies on the first element to trigger a future timer but
the time might not progress this far. It should additionally trigger
at the end of the window.

Here is a version with an improved continuous trigger:
https://gist.github.com/mxm/a1d6b22c772971c98e2ce886dc9818b1?ts=2

By the way, if you remove the ContinuousEventTimeTrigger (which will
implicitly set a regular EventTimeTrigger) for the CoGroup, it also
works fine. I don't know whether you really want early firings there.

Cheers,
Max

PS: Final word on the cleanup. The state should always be cleaned up
at the end of the window + allowedLateness you have set.

On Tue, Nov 22, 2016 at 11:08 PM, William Saar <[hidden email]> wrote:

> Thanks!
> One difference is that my topology had 2 sources. I have updated your
> example to also use 2 sources and that breaks the co-group operation in the
> example as well!
>
> https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed
>
> Nice to know that purging can be added to the event trigger.
>
> William
>
>
> ----- Original Message -----
> From:
> [hidden email]
>
> To:
> "[hidden email]" <[hidden email]>
> Cc:
>
> Sent:
> Tue, 22 Nov 2016 11:50:52 +0100
>
> Subject:
> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>
>
> Hi William,
>
> I've reproduced your example locally for some toy data and everything
> was working as expected (with the early triggering). So I'm assuming
> either there is something wrong with your input data or the behavior
> doesn't always manifest.
>
> Here's the example I run in case you want to try:
> https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405
>
> Gyula is right, the ContinuousEventTimeTrigger never purges the window
> but that you can circumvent that by extending this trigger and purging
> at the end of the window, similarly as done in the EventTimeTrigger.
>
> -Max
>
>
> On Mon, Nov 21, 2016 at 6:52 PM, William Saar <[hidden email]> wrote:
>> Thanks!
>>
>> Yes, the SlidingEventTimeWindow works, but is there any way to
>> pre-aggregate
>> things with tumbling windows that emit events more often than their window
>> size? Perhaps I can do this before I merge the streams? (But if
>> ContinuousEventTimeTrigger is the only way to do that, it's bad if it
>> doesn't clean up its state).
>>
>> I assume using sliding window states will be too large and less efficient
>> than tumbling windows as a sliding fold needs to keep all events in the
>> window and recompute the fold as events slide out of the window, while a
>> tumbling fold just needs to keep the aggregation and can discard events as
>> it folds them.
>>
>> I am reviewing how one would replace a batch solution based on 3 bucketed
>> aggregations of different window sizes and it seems tumbling windows would
>> be a perfect fit and would need to keep only the 3 aggregations a memory,
>> while sliding windows would need to keep up to 3 copies of all events (for
>> at least the smallest window size) to compute the same type of results.
>>
>> Hälsningar!
>> William
>>
>>
>> ----- Original Message -----
>> From:
>> [hidden email]
>>
>> To:
>> <[hidden email]>
>> Cc:
>>
>> Sent:
>> Mon, 21 Nov 2016 08:22:16 +0000
>> Subject:
>> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>>
>>
>>
>> Hi William,
>>
>> I am wondering whether the ContinuousEventTimeTrigger is the best choice
>> here (it never cleans up the state as far as I know).
>>
>> Have you tried the simple SlidingEventTimeWindows as your window function?
>>
>> Cheers,
>> Gyula
>>
>> William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo,
>> 18:28):
>>>
>>> Hi!
>>>
>>> My topology below seems to work when I comment out all the lines with
>>> ContinuousEventTimeTrigger, but prints nothing when the line is in there.
>>> Can I coGroup two large time windows that use a different trigger time
>>> than
>>> the window size? (even if the ContinuousEventTimeTrigger doesn't work for
>>> coGroups, I would not expect the result to be completely silent).
>>>
>>> The streams I'm cogroupng are from 2 different Kafka sources and uses
>>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps
>>>
>>> DataStream<CommonType> stream1 =
>>> <stream of event type1>
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .fold(...);
>>>
>>>
>>> DataStream<CommonType> stream2 =
>>> <stream of event type2>
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .fold(...);
>>>
>>>
>>>
>>> stream1.coGroup(stream2).where(...).equalTo(...)
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .print()
>>>
>>> Thanks,
>>>
>>> William
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

William Saar-2
Thanks a lot! Your patched window at least causes events to be fired when the window expires, but I think I may be better off with a custom trigger that always fires when a new event arrives in the window (I figured out how to write one of those).

One more question about grouping streams from different Kafka sources. Can I skip windowing the separate streams before grouping them and just union the two streams without windows and only window the resulting union stream? or will that cause events from the different sources to end up out of order with each other in the union stream and get dropped when they arrive at the window? Anything special I should do/avoid to join events from different Kafka sources to make Flink process them in the order of their event times and prevent events from getting dropped (even if one source has a lot more events from many more days and is slower to read than the other etc.)?

----- Original Message -----

To:
"[hidden email]" <[hidden email]>
Cc:

Sent:
Wed, 23 Nov 2016 15:53:27 +0100
Subject:
Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?


The problem here is that the ContiuousEventTimeTrigger is kind of
broken. It relies on the first element to trigger a future timer but
the time might not progress this far. It should additionally trigger
at the end of the window.

Here is a version with an improved continuous trigger:
https://gist.github.com/mxm/a1d6b22c772971c98e2ce886dc9818b1?ts=2

By the way, if you remove the ContinuousEventTimeTrigger (which will
implicitly set a regular EventTimeTrigger) for the CoGroup, it also
works fine. I don't know whether you really want early firings there.

Cheers,
Max

PS: Final word on the cleanup. The state should always be cleaned up
at the end of the window + allowedLateness you have set.

On Tue, Nov 22, 2016 at 11:08 PM, William Saar <[hidden email]> wrote:
> Thanks!
> One difference is that my topology had 2 sources. I have updated your
> example to also use 2 sources and that breaks the co-group operation in the
> example as well!
>
> https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed
>
> Nice to know that purging can be added to the event trigger.
>
> William
>
>
> ----- Original Message -----
> From:
> [hidden email]
>
> To:
> "[hidden email]" <[hidden email]>
> Cc:
>
> Sent:
> Tue, 22 Nov 2016 11:50:52 +0100
>
> Subject:
> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>
>
> Hi William,
>
> I've reproduced your example locally for some toy data and everything
> was working as expected (with the early triggering). So I'm assuming
> either there is something wrong with your input data or the behavior
> doesn't always manifest.
>
> Here's the example I run in case you want to try:
> https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405
>
> Gyula is right, the ContinuousEventTimeTrigger never purges the window
> but that you can circumvent that by extending this trigger and purging
> at the end of the window, similarly as done in the EventTimeTrigger.
>
> -Max
>
>
> On Mon, Nov 21, 2016 at 6:52 PM, William Saar <[hidden email]> wrote:
>> Thanks!
>>
>> Yes, the SlidingEventTimeWindow works, but is there any way to
>> pre-aggregate
>> things with tumbling windows that emit events more often than their window
>> size? Perhaps I can do this before I merge the streams? (But if
>> ContinuousEventTimeTrigger is the only way to do that, it's bad if it
>> doesn't clean up its state).
>>
>> I assume using sliding window states will be too large and less efficient
>> than tumbling windows as a sliding fold needs to keep all events in the
>> window and recompute the fold as events slide out of the window, while a
>> tumbling fold just needs to keep the aggregation and can discard events as
>> it folds them.
>>
>> I am reviewing how one would replace a batch solution based on 3 bucketed
>> aggregations of different window sizes and it seems tumbling windows would
>> be a perfect fit and would need to keep only the 3 aggregations a memory,
>> while sliding windows would need to keep up to 3 copies of all events (for
>> at least the smallest window size) to compute the same type of results.
>>
>> Hälsningar!
>> William
>>
>>
>> ----- Original Message -----
>> From:
>> [hidden email]
>>
>> To:
>> <[hidden email]>
>> Cc:
>>
>> Sent:
>> Mon, 21 Nov 2016 08:22:16 +0000
>> Subject:
>> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>>
>>
>>
>> Hi William,
>>
>> I am wondering whether the ContinuousEventTimeTrigger is the best choice
>> here (it never cleans up the state as far as I know).
>>
>> Have you tried the simple SlidingEventTimeWindows as your window function?
>>
>> Cheers,
>> Gyula
>>
>> William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo,
>> 18:28):
>>>
>>> Hi!
>>>
>>> My topology below seems to work when I comment out all the lines with
>>> ContinuousEventTimeTrigger, but prints nothing when the line is in there.
>>> Can I coGroup two large time windows that use a different trigger time
>>> than
>>> the window size? (even if the ContinuousEventTimeTrigger doesn't work for
>>> coGroups, I would not expect the result to be completely silent).
>>>
>>> The streams I'm cogroupng are from 2 different Kafka sources and uses
>>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps
>>>
>>> DataStream<CommonType> stream1 =
>>> <stream of event type1>
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .fold(...);
>>>
>>>
>>> DataStream<CommonType> stream2 =
>>> <stream of event type2>
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .fold(...);
>>>
>>>
>>>
>>> stream1.coGroup(stream2).where(...).equalTo(...)
>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>> .print()
>>>
>>> Thanks,
>>>
>>> William
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?

Maximilian Michels
I've submitted a patch for 1.1.4 and 1.2.0 which improves the
ContinuousEventTimeTrigger regarding firing at the end of the window:
https://issues.apache.org/jira/browse/FLINK-5149 If your requirement
is to fire on every element, you're indeed better off with a custom
trigger.

> One more question about grouping streams from different Kafka sources. Can I skip windowing the separate streams before grouping them and just union the two streams without windows and only window the resulting union stream? or will that cause events from the different sources to end up out of order with each other in the union stream and get dropped when they arrive at the window?

You can union the two streams without windowing them beforehand. The
downstream operator will have two inputs and receive data from both
Kafka sources. There is no order defined among the inputs since both
streams are potentially infinite. The Event Time mechanism will ensure
that the time doesn't progress further than the minimum of the latest
Watermarks sent out by the two sources.

> Anything special I should do/avoid to join events from different Kafka sources to make Flink process them in the order of their event times and prevent events from getting dropped (even if one source has a lot more events from many more days and is slower to read than the other etc.)?

No, if your requirement is to union the two sources and you have
correct Watermark generation in place, you should be fine. The only
thing to consider is whether the two are timely correlated. If they're
not, one input might block a Window computation for an arbitrary
amount of time which could accumulate a lot of memory.

-Max


On Fri, Nov 25, 2016 at 8:40 AM, William Saar <[hidden email]> wrote:

> Thanks a lot! Your patched window at least causes events to be fired when
> the window expires, but I think I may be better off with a custom trigger
> that always fires when a new event arrives in the window (I figured out how
> to write one of those).
>
> One more question about grouping streams from different Kafka sources. Can I
> skip windowing the separate streams before grouping them and just union the
> two streams without windows and only window the resulting union stream? or
> will that cause events from the different sources to end up out of order
> with each other in the union stream and get dropped when they arrive at the
> window? Anything special I should do/avoid to join events from different
> Kafka sources to make Flink process them in the order of their event times
> and prevent events from getting dropped (even if one source has a lot more
> events from many more days and is slower to read than the other etc.)?
>
>
> ----- Original Message -----
> From:
> [hidden email]
>
> To:
> "[hidden email]" <[hidden email]>
> Cc:
>
> Sent:
> Wed, 23 Nov 2016 15:53:27 +0100
>
> Subject:
> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>
>
> The problem here is that the ContiuousEventTimeTrigger is kind of
> broken. It relies on the first element to trigger a future timer but
> the time might not progress this far. It should additionally trigger
> at the end of the window.
>
> Here is a version with an improved continuous trigger:
> https://gist.github.com/mxm/a1d6b22c772971c98e2ce886dc9818b1?ts=2
>
> By the way, if you remove the ContinuousEventTimeTrigger (which will
> implicitly set a regular EventTimeTrigger) for the CoGroup, it also
> works fine. I don't know whether you really want early firings there.
>
> Cheers,
> Max
>
> PS: Final word on the cleanup. The state should always be cleaned up
> at the end of the window + allowedLateness you have set.
>
> On Tue, Nov 22, 2016 at 11:08 PM, William Saar <[hidden email]> wrote:
>> Thanks!
>> One difference is that my topology had 2 sources. I have updated your
>> example to also use 2 sources and that breaks the co-group operation in
>> the
>> example as well!
>>
>> https://gist.github.com/saarw/8f9513435a41ab29b36da77c16a8b0ed
>>
>> Nice to know that purging can be added to the event trigger.
>>
>> William
>>
>>
>> ----- Original Message -----
>> From:
>> [hidden email]
>>
>> To:
>> "[hidden email]" <[hidden email]>
>> Cc:
>>
>> Sent:
>> Tue, 22 Nov 2016 11:50:52 +0100
>>
>> Subject:
>> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>>
>>
>> Hi William,
>>
>> I've reproduced your example locally for some toy data and everything
>> was working as expected (with the early triggering). So I'm assuming
>> either there is something wrong with your input data or the behavior
>> doesn't always manifest.
>>
>> Here's the example I run in case you want to try:
>> https://gist.github.com/mxm/6cb1e6e9a572a26df76917176849f405
>>
>> Gyula is right, the ContinuousEventTimeTrigger never purges the window
>> but that you can circumvent that by extending this trigger and purging
>> at the end of the window, similarly as done in the EventTimeTrigger.
>>
>> -Max
>>
>>
>> On Mon, Nov 21, 2016 at 6:52 PM, William Saar <[hidden email]> wrote:
>>> Thanks!
>>>
>>> Yes, the SlidingEventTimeWindow works, but is there any way to
>>> pre-aggregate
>>> things with tumbling windows that emit events more often than their
>>> window
>>> size? Perhaps I can do this before I merge the streams? (But if
>>> ContinuousEventTimeTrigger is the only way to do that, it's bad if it
>>> doesn't clean up its state).
>>>
>>> I assume using sliding window states will be too large and less efficient
>>> than tumbling windows as a sliding fold needs to keep all events in the
>>> window and recompute the fold as events slide out of the window, while a
>>> tumbling fold just needs to keep the aggregation and can discard events
>>> as
>>> it folds them.
>>>
>>> I am reviewing how one would replace a batch solution based on 3 bucketed
>>> aggregations of different window sizes and it seems tumbling windows
>>> would
>>> be a perfect fit and would need to keep only the 3 aggregations a memory,
>>> while sliding windows would need to keep up to 3 copies of all events
>>> (for
>>> at least the smallest window size) to compute the same type of results.
>>>
>>> Hälsningar!
>>> William
>>>
>>>
>>> ----- Original Message -----
>>> From:
>>> [hidden email]
>>>
>>> To:
>>> <[hidden email]>
>>> Cc:
>>>
>>> Sent:
>>> Mon, 21 Nov 2016 08:22:16 +0000
>>> Subject:
>>> Re: ContinuousEventTimeTrigger breaks coGrouped windowed streams?
>>>
>>>
>>>
>>> Hi William,
>>>
>>> I am wondering whether the ContinuousEventTimeTrigger is the best choice
>>> here (it never cleans up the state as far as I know).
>>>
>>> Have you tried the simple SlidingEventTimeWindows as your window
>>> function?
>>>
>>> Cheers,
>>> Gyula
>>>
>>> William Saar <[hidden email]> ezt írta (időpont: 2016. nov. 19., Szo,
>>> 18:28):
>>>>
>>>> Hi!
>>>>
>>>> My topology below seems to work when I comment out all the lines with
>>>> ContinuousEventTimeTrigger, but prints nothing when the line is in
>>>> there.
>>>> Can I coGroup two large time windows that use a different trigger time
>>>> than
>>>> the window size? (even if the ContinuousEventTimeTrigger doesn't work
>>>> for
>>>> coGroups, I would not expect the result to be completely silent).
>>>>
>>>> The streams I'm cogroupng are from 2 different Kafka sources and uses
>>>> event time with 0 out of orderness and I'm on Flink 1.1.3, if that helps
>>>>
>>>> DataStream<CommonType> stream1 =
>>>> <stream of event type1>
>>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>>> .fold(...);
>>>>
>>>>
>>>> DataStream<CommonType> stream2 =
>>>> <stream of event type2>
>>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>>> .fold(...);
>>>>
>>>>
>>>>
>>>> stream1.coGroup(stream2).where(...).equalTo(...)
>>>> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
>>>> .print()
>>>>
>>>> Thanks,
>>>>
>>>> William
>>>>
>>>