Hey Fabian, it seems as simple as this ?
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements and the date zone the offset it computed off. Windows cannot overlap.
*
* <p>For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TimeZoneAwareTumblingEventTimeWindows.of(Time.minutes(1), ZoneId.of(AMERICA_NEW_YORK_ZONE)));
* } </pre>
*/
public class TimeZoneAwareTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final ZoneId zoneID;
protected TimeZoneAwareTumblingEventTimeWindows(long size, ZoneId zoneID) {
this.size = size;
this.zoneID = zoneID;
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TimeZoneAwareTumblingEventTimeWindows of(Time size, ZoneId zoneID) {
return new TimeZoneAwareTumblingEventTimeWindows(size.toMilliseconds(), zoneID);
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
ZonedDateTime dateTime = Instant.ofEpochMilli(timestamp).atZone(zoneID);
long start = TimeWindow.getWindowStartWithOffset(timestamp, dateTime.getOffset().getTotalSeconds()*1000l, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingEventTimeWindows(" + size + ")";
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}