Window based on tuple timestamps

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Window based on tuple timestamps

Philipp Goetze
Hello again,

another question from me =). Could you provide an example on how to correctly use windows based on timestamps on the tuples (i.e. non-realtime)?

As a simple example I tried something like this:
val w4 = rdfSource.window(Time.of(200000L, customTimestamp, 1219625100000L)).mapWindow(customMap _).flatten().print()
In the customMap I just collect all tuples in one list for each window. However, this does not work correctly as sometimes all the records are collected in one list, although there should be two windows for the example data.

Best Regards,
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Window based on tuple timestamps

Aljoscha Krettek
Hi Philipp,
am I correct to assume that your tuples do not arrive in the order of the timestamp that you extract. Unfortunately, for that case the current windowing implementation does not work correctly. We are working hard on fixing this for the upcoming 0.10 release, though. If you are interested in what we are doing there you can check out the design documents in the Flink wiki:

Cheers,
Aljoscha

On Fri, 18 Sep 2015 at 13:19 Philipp Goetze <[hidden email]> wrote:
Hello again,

another question from me =). Could you provide an example on how to correctly use windows based on timestamps on the tuples (i.e. non-realtime)?

As a simple example I tried something like this:
val w4 = rdfSource.window(Time.of(200000L, customTimestamp, 1219625100000L)).mapWindow(customMap _).flatten().print()
In the customMap I just collect all tuples in one list for each window. However, this does not work correctly as sometimes all the records are collected in one list, although there should be two windows for the example data.

Best Regards,
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Window based on tuple timestamps

Philipp Goetze
Hi Aljoscha,

on the big data sets I am not quite sure if they are completely ordered, but my test data (just 10 records) is sorted. There are only two different timestamps. Sometimes I end up with two windows (which is correct) and sometimes only one. So this is not deterministic. Should I maybe set the parallelism to 1 at some place?

Best Regards,
Philipp

On 18.09.2015 17:05, Aljoscha Krettek wrote:
Hi Philipp,
am I correct to assume that your tuples do not arrive in the order of the timestamp that you extract. Unfortunately, for that case the current windowing implementation does not work correctly. We are working hard on fixing this for the upcoming 0.10 release, though. If you are interested in what we are doing there you can check out the design documents in the Flink wiki:

Cheers,
Aljoscha

On Fri, 18 Sep 2015 at 13:19 Philipp Goetze <[hidden email]> wrote:
Hello again,

another question from me =). Could you provide an example on how to correctly use windows based on timestamps on the tuples (i.e. non-realtime)?

As a simple example I tried something like this:
val w4 = rdfSource.window(Time.of(200000L, customTimestamp, 1219625100000L)).mapWindow(customMap _).flatten().print()
In the customMap I just collect all tuples in one list for each window. However, this does not work correctly as sometimes all the records are collected in one list, although there should be two windows for the example data.

Best Regards,
Philipp

Reply | Threaded
Open this post in threaded view
|

Re: Window based on tuple timestamps

Aljoscha Krettek
Hi,
yes, setting the parallelism to 1 might solve it but it will also cripple the performance. During next week the first part of our rework of the windowing system should make it into master, so then you would be able to test your programs with that. If you are using the 0.10-SNAPSHOT version.

I'm sorry we don't have a better answer right now but we are working hard on fixing these things.

Cheers,
Aljoscha

On Sat, 19 Sep 2015 at 12:53 Philipp Goetze <[hidden email]> wrote:
Hi Aljoscha,

on the big data sets I am not quite sure if they are completely ordered, but my test data (just 10 records) is sorted. There are only two different timestamps. Sometimes I end up with two windows (which is correct) and sometimes only one. So this is not deterministic. Should I maybe set the parallelism to 1 at some place?

Best Regards,
Philipp


On 18.09.2015 17:05, Aljoscha Krettek wrote:
Hi Philipp,
am I correct to assume that your tuples do not arrive in the order of the timestamp that you extract. Unfortunately, for that case the current windowing implementation does not work correctly. We are working hard on fixing this for the upcoming 0.10 release, though. If you are interested in what we are doing there you can check out the design documents in the Flink wiki:

Cheers,
Aljoscha

On Fri, 18 Sep 2015 at 13:19 Philipp Goetze <[hidden email]> wrote:
Hello again,

another question from me =). Could you provide an example on how to correctly use windows based on timestamps on the tuples (i.e. non-realtime)?

As a simple example I tried something like this:
val w4 = rdfSource.window(Time.of(200000L, customTimestamp, 1219625100000L)).mapWindow(customMap _).flatten().print()
In the customMap I just collect all tuples in one list for each window. However, this does not work correctly as sometimes all the records are collected in one list, although there should be two windows for the example data.

Best Regards,
Philipp