Hi @all,
I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. The used timeWindow.getStart() and getEnd in order to get the boundaries of the window. Can someone explain this? Best, Nico TimeWindows with Elements: Start: 1482332940000 - End: 1482332960000 timestamp=1482332952907 Start: 1482332960000 - End: 1482332980000 timestamp=1482332958929 timestamp=1482332963995 timestamp=1482332969027 timestamp=1482332974039 Start: 1482332980000 - End: 1482333000000 timestamp=1482332979059 timestamp=1482332984072 timestamp=1482332989081 timestamp=1482332994089 Start: 1482333000000 - End: 1482333020000 timestamp=1482332999113 timestamp=1482333004123 timestamp=1482333009132 timestamp=1482333014144 |
Hi, could you please share code (and example data) for producing this output. I'd like to have a look. Cheers, Aljoscha On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
|
Hi Aljoscha, thank you for having a look. Actually there is not too much code based on timestamps: stream .keyBy("id") .map(...) .filter(...) .map(...) .keyBy("areaID") .map(new KeyExtractor()) .keyBy("f1.areaID","f0.sinterval") .window(TumblingEventTimeWindows.of(Time.seconds(20))) .apply(new TrafficInformation()); The map functions only enrich the data and don't change anything related to the timestamp. the apply function is: @Override public void apply( Tuple key, TimeWindow timeWindow, Iterable<Tuple2<DirectionInterval, Car>> cars, Collector<Tuple3<String, Double, Double>> out) throws Exception { System.out.println("Start: " +timeWindow.getStart()); System.out.println("End: " +timeWindow.getEnd()); for(Tuple2<DirectionInterval, Car> t : cars){ System.out.println(t.f1); } System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class: public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> { public TimestampGenerator(Time maxOutOfOrderness){ super(maxOutOfOrderness); } @Override public long extractTimestamp(Car car) { return car.getTimestamp(); } Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/ Best, Nico 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
|
Hi, I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in: stream .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline .keyBy("id") .map(...) .filter(...) .map(...) .keyBy("areaID") .map(new KeyExtractor()) .keyBy("f1.areaID","f0.sinterval") .window(TumblingEventTimeWindows.of(Time.seconds(20))) .apply(new TrafficInformation()); Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order. Cheers, Aljoscha On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
|
Hi Aljoscha, is was able to identify the root cause of the problem. It is my first map function using the ValueState. But first, the assignTimestampsAndWatermarks( FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09 = In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this: private static class BearingMap extends RichMapFunction<Car, Car> { } Together with the window function: private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> { 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <[hidden email]>:
|
Hi, can anyone help me with this problem? I don't get it. Forget the examples below, I've created a copy / paste example to reproduce the problem of incorrect results when using key-value state und windowOperator. public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple2<String,Long>> stream = env.fromElements( new Tuple2<>("1",1485446260994L), new Tuple2<>("1",1485446266012L), new Tuple2<>("1",1485446271031L), new Tuple2<>("1",1485446276040L), new Tuple2<>("1",1485446281045L), new Tuple2<>("1",1485446286049L), new Tuple2<>("1",1485446291062L), new Tuple2<>("1",1485446296066L), new Tuple2<>("1",1485446302019L) ); stream .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(0)) { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) { return stringLongTuple2.f1; } }) .keyBy("f0") .map(new MapTest()) .keyBy("f0") .window(TumblingEventTimeWindows.of(Time.seconds(20))) .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception { Set<Long> set = new HashSet<>(); for(Tuple2<String,Long> t : iterable){ set.add(t.f1); } StringBuilder sb = new StringBuilder(); sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] "); sb.append("Set " +set.toString()); System.out.println(sb.toString()); } }) .print(); // execute program env.execute("Flink Streaming Java API Skeleton"); } private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> { private transient ValueState<Tuple2<String, Long>> state; @Override public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) throws Exception { Tuple2<String,Long> t = state.value(); state.update(stringLongTuple2); if(t == null) return stringLongTuple2; return t; } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>( "lastEvent", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}), null ); state = getRuntimeContext().getState(vsd); } } } Output: Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, 1485446266012] Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, 1485446286049, 1485446276040] Window [1485446300000 1485446320000] Set [1485446296066] Best, Nico BTW ... I am using Flink 1.1.3. 2017-01-16 12:18 GMT+01:00 Nico <[hidden email]>:
|
Now I see. What you're doing in this example is basically reassigning timestamps to other elements in your stateful MapFunction. Flink internally keeps track of the timestamp of an element. This can normally not be changed, except by using a TimestampAssigner, which you're doing. Now, the output from a MapFunction has the same timestamp as the input element. By keeping an element in state and emitting it when the next element arrives you emit it with the timestamp of that next element and that's the reason why the end up in the "wrong" windows. Does that help? - Aljoscha On Thu, 26 Jan 2017 at 19:17 Nico <[hidden email]> wrote:
|
Hi Aljoscha, got it!!! :) Thank you. So, in order to retain the "original" timestamps, it would be necessary to assign the timestemps after the MapFunction instead of the kafka source? At lest, this solves the issue in the example. Best, Nico 2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <[hidden email]>:
|
Yes, that's true. On Fri, 27 Jan 2017 at 13:16 Nico <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |