Hi all,
Is it possible to dynamically set the size/width of a SlidingEventTimeWindow based on a data coming from the stream? Our use case is as follows. We create a stream sourced from external system and coming in as a JSON string which is deserialized to a stream of POJO. The deserialized object contains an event timestamp, data and details about how to analyze the contained data as well as the length of a time window to analyze. It would be ideal if we could leverage the functionality of the SlidingEventTimeWindows but instead of hard coding the window times, use data from the message to configure this on the fly. Is this possible?
Note: the stream is keyed and timestamped with event time and the window size will not change for the same key.
Ultimately we need a way to dynamically change the window sizes in order to adjust to different timing specifications not directly controlled or even known before-hand by the person writing the flink analysis program.
Paul
|
Just checking, all the elements that would fall into a window of length X also have X as a property? In that case you should be able to do something like this: public Collection<TimeWindow> assignWindows(PojoType element, long timestamp, WindowAssignerContext context) { long size = element.windowSize; long slide = element.windowSlide; if (timestamp > Long.MIN_VALUE) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } this is basically a copy of SlidingEventTimeWindows where assignWindows is changed for the above and where size/slide are not stored in the assigner but are read from the object. This only works if all elements of a key that should fall into the same windows have the same size/slide property, otherwise they would spawn different windows. Cheers, Aljoscha On Tue, 30 Aug 2016 at 21:28 Paul Joireman <[hidden email]> wrote:
|
Thanks Aljoscha,
The elements can certainly be configured to have the window size and slide as part of them and that will not change for the same elements. There may a different class of elements with a different window or slide but those values will be essentially final for that class of elements.
Paul From: Aljoscha Krettek <[hidden email]>
Sent: Wednesday, August 31, 2016 10:53:17 AM To: [hidden email] Cc: Chad Conkright Subject: Re: Setting EventTime window width using stream data Just checking, all the elements that would fall into a window of length X also have X as a property? In that case you should be able to do something like this:
public Collection<TimeWindow> assignWindows(PojoType element, long timestamp, WindowAssignerContext context) {
long size = element.windowSize;
long slide = element.windowSlide;
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
this is basically a copy of SlidingEventTimeWindows where assignWindows is changed for the above and where size/slide are not stored in the assigner but are read from the object. This only works if all elements of a key that should fall into the same windows
have the same size/slide property, otherwise they would spawn different windows.
Cheers,
Aljoscha
On Tue, 30 Aug 2016 at 21:28 Paul Joireman <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |