Setting EventTime window width using stream data

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting EventTime window width using stream data

Paul Joireman

Hi all,


Is it possible to dynamically set the size/width of a SlidingEventTimeWindow based on a data coming from the stream?   Our use case is as follows.   We create a 

stream sourced from external system and coming in as a JSON string which is deserialized to a stream of POJO.    The deserialized object contains an event timestamp,

data and details about how to analyze the contained data as well as the length of a time window to analyze.    It would be ideal if we could leverage the functionality of the 

SlidingEventTimeWindows but instead of hard coding the window times, use data from the message to configure this on the fly.   Is this possible?   


Note: the stream is keyed and timestamped with event time and the window size will not change for the same key.


Ultimately we need a way to dynamically change the window sizes in order to adjust to different timing specifications not directly controlled or even known before-hand by

the person writing the flink analysis program.  


Paul


Reply | Threaded
Open this post in threaded view
|

Re: Setting EventTime window width using stream data

Aljoscha Krettek
Just checking, all the elements that would fall into a window of length X also have X as a property? In that case you should be able to do something like this:

public Collection<TimeWindow> assignWindows(PojoType element, long timestamp, WindowAssignerContext context) {
    long size = element.windowSize;
    long slide = element.windowSlide;

    if (timestamp > Long.MIN_VALUE) {
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    } else {
        throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

this is basically a copy of SlidingEventTimeWindows where assignWindows is changed for the above and where size/slide are not stored in the assigner but are read from the object. This only works if all elements of a key that should fall into the same windows have the same size/slide property, otherwise they would spawn different windows.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 21:28 Paul Joireman <[hidden email]> wrote:

Hi all,


Is it possible to dynamically set the size/width of a SlidingEventTimeWindow based on a data coming from the stream?   Our use case is as follows.   We create a 

stream sourced from external system and coming in as a JSON string which is deserialized to a stream of POJO.    The deserialized object contains an event timestamp,

data and details about how to analyze the contained data as well as the length of a time window to analyze.    It would be ideal if we could leverage the functionality of the 

SlidingEventTimeWindows but instead of hard coding the window times, use data from the message to configure this on the fly.   Is this possible?   


Note: the stream is keyed and timestamped with event time and the window size will not change for the same key.


Ultimately we need a way to dynamically change the window sizes in order to adjust to different timing specifications not directly controlled or even known before-hand by

the person writing the flink analysis program.  


Paul


Reply | Threaded
Open this post in threaded view
|

Re: Setting EventTime window width using stream data

Paul Joireman

Thanks Aljoscha,


The elements can certainly be configured to have the window size and slide as part of them and that will not change for the same elements.   There may a different class of elements with a different window or slide but those values will be essentially final for that class of elements.


Paul


From: Aljoscha Krettek <[hidden email]>
Sent: Wednesday, August 31, 2016 10:53:17 AM
To: [hidden email]
Cc: Chad Conkright
Subject: Re: Setting EventTime window width using stream data
 
Just checking, all the elements that would fall into a window of length X also have X as a property? In that case you should be able to do something like this:

public Collection<TimeWindow> assignWindows(PojoType element, long timestamp, WindowAssignerContext context) {
    long size = element.windowSize;
    long slide = element.windowSlide;

    if (timestamp > Long.MIN_VALUE) {
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    } else {
        throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

this is basically a copy of SlidingEventTimeWindows where assignWindows is changed for the above and where size/slide are not stored in the assigner but are read from the object. This only works if all elements of a key that should fall into the same windows have the same size/slide property, otherwise they would spawn different windows.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 21:28 Paul Joireman <[hidden email]> wrote:

Hi all,


Is it possible to dynamically set the size/width of a SlidingEventTimeWindow based on a data coming from the stream?   Our use case is as follows.   We create a 

stream sourced from external system and coming in as a JSON string which is deserialized to a stream of POJO.    The deserialized object contains an event timestamp,

data and details about how to analyze the contained data as well as the length of a time window to analyze.    It would be ideal if we could leverage the functionality of the 

SlidingEventTimeWindows but instead of hard coding the window times, use data from the message to configure this on the fly.   Is this possible?   


Note: the stream is keyed and timestamped with event time and the window size will not change for the same key.


Ultimately we need a way to dynamically change the window sizes in order to adjust to different timing specifications not directly controlled or even known before-hand by

the person writing the flink analysis program.  


Paul