DataStream idleDuration = cleanedTrips .keyBy("license") .flatMap(new DetermineIdleDuration()) .filter(duration -> duration.avg_idle_duration >= 0 && duration.avg_idle_duration <= 240) .keyBy("location") .timeWindow(Time.minutes(10)) .apply((Tuple tuple, TimeWindow window, Iterable input, Collector out) -> { double[] location = Iterables.get(input, 0).location; double avgDuration = StreamSupport .stream(input.spliterator(), false) .mapToDouble(idle -> idle.avg_idle_duration) .average() .getAsDouble(); out.collect(new IdleDuration(location, avgDuration, window.maxTimestamp())); }); static class DetermineIdleDuration extends RichFlatMapFunction { private transient ValueState tripCache; private final DecimalFormat df = new DecimalFormat("#.###"); @Override public void flatMap(TripEvent currentTrip, Collector out) throws Exception { TripEvent previousTrip = tripCache.value(); if (previousTrip != null) { double lat = Double.parseDouble(df.format(previousTrip.dropoff_lat)); double lon = Double.parseDouble(df.format(previousTrip.dropoff_lon)); long idle = new Duration(previousTrip.dropoff_time, currentTrip.pickup_time).getStandardMinutes(); long timestamp = currentTrip.pickup_time.getMillis(); out.collect(new IdleDuration(lat, lon, idle, timestamp)); } tripCache.update(currentTrip); } @Override public void open(Configuration config) { ValueStateDescriptor descriptor = new ValueStateDescriptor<>("previous trip", TypeInformation.of(new TypeHint() {}), null); tripCache = getRuntimeContext().getState(descriptor); } } public class IdleDuration extends Document { public final double[] location; public final double avg_idle_duration; public IdleDuration() { super(-1L); location = null; avg_idle_duration = 0; } public IdleDuration(double[] location, double avg_idle_duration, long timestamp) { super(timestamp); this.location = location; this.avg_idle_duration = avg_idle_duration; } public IdleDuration(double lat, double lon, double avg_idle_duration, long timestamp) { super(timestamp); this.avg_idle_duration = avg_idle_duration; this.location = new double[] {lon, lat}; } }