Events are assigned to wrong window

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Events are assigned to wrong window

nico-2
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144
Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

Aljoscha Krettek
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144
Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

nico-2
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144

Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

Aljoscha Krettek
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144

Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

nico-2
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map function using the ValueState. But first, the assignTimestampsAndWatermarks() is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
private static class BearingMap extends RichMapFunction<Car, Car> {

private transient ValueState<Car> state;
private final double maxdiff = 12; // in Sekunden

@Override
public Car map(Car destination) throws Exception {

Car origin = state.value();
double olat, olon, dlat, dlon;

/**
* Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
*/
if (origin == null){
state.update(destination);
// gebe Car ohne Aenderungen zurueck
return destination;
}

double diff = origin.getTimestamp()-destination.getTimestamp();

System.out.println("Differenz: " +diff);

if(Math.abs(diff) <= maxdiff*1000){

/*
* Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
*/
if(diff > 0){
Car tmp = destination;
destination = origin;
origin = tmp;
}

/*
* Car tmp ist immer der Ursprung
*/

double bearing = Helper.calculateBearing(
origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

// Update des State's
state.update(destination);

origin.setDirection(bearing);
return origin;

}

// Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
return origin;

}


@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
"lastEvent",
Car.class,
null
);

state = getRuntimeContext().getState(vsd);
}
} 
Together with the window function:

private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
Set<Long> timestamps = new HashSet<Long>();

for( Car c : iterable){
timestamps.add(c.getTimestamp());
}

System.out.println( s +timestamps +"\n\n");
}
}
I get for :
stream
.filter(new FilterFunction<Car>() {
@Override
public boolean filter(Car car) throws Exception {
return car.getId().equals("car.330");
}
})
.keyBy("id")
.map(new BearingMap())
.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new TimeWindowTest());
So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1 
will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the 
start of the window.
Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]

Best,
Nico
 

2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144


Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

nico-2
Hi,

can anyone help me with this problem? I don't get it. Forget the examples below, I've created a copy / paste example to reproduce the problem of incorrect results when using key-value state und windowOperator.


public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple2<String,Long>> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)
);

stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
return stringLongTuple2.f1;
}
})
.keyBy("f0")
.map(new MapTest())
.keyBy("f0")
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception {

Set<Long> set = new HashSet<>();
for(Tuple2<String,Long> t : iterable){
set.add(t.f1);
}

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] ");
sb.append("Set " +set.toString());
System.out.println(sb.toString());
}
})
.print();


// execute program
env.execute("Flink Streaming Java API Skeleton");
}

private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {

private transient ValueState<Tuple2<String, Long>> state;

@Override
public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) throws Exception {

Tuple2<String,Long> t = state.value();

state.update(stringLongTuple2);

if(t == null) return stringLongTuple2;

return t;
}

@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
"lastEvent",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
null
);

state = getRuntimeContext().getState(vsd);
}
}
}


Output:

Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, 1485446266012]
Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, 1485446286049, 1485446276040]
Window [1485446300000 1485446320000] Set [1485446296066]

Best,
Nico

BTW ... I am using Flink 1.1.3.


2017-01-16 12:18 GMT+01:00 Nico <[hidden email]>:
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map function using the ValueState. But first, the assignTimestampsAndWatermarks() is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
private static class BearingMap extends RichMapFunction<Car, Car> {

private transient ValueState<Car> state;
private final double maxdiff = 12; // in Sekunden

@Override
public Car map(Car destination) throws Exception {

Car origin = state.value();
double olat, olon, dlat, dlon;

/**
* Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
*/
if (origin == null){
state.update(destination);
// gebe Car ohne Aenderungen zurueck
return destination;
}

double diff = origin.getTimestamp()-destination.getTimestamp();

System.out.println("Differenz: " +diff);

if(Math.abs(diff) <= maxdiff*1000){

/*
* Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
*/
if(diff > 0){
Car tmp = destination;
destination = origin;
origin = tmp;
}

/*
* Car tmp ist immer der Ursprung
*/

double bearing = Helper.calculateBearing(
origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

// Update des State's
state.update(destination);

origin.setDirection(bearing);
return origin;

}

// Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
return origin;

}


@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
"lastEvent",
Car.class,
null
);

state = getRuntimeContext().getState(vsd);
}
} 
Together with the window function:

private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
Set<Long> timestamps = new HashSet<Long>();

for( Car c : iterable){
timestamps.add(c.getTimestamp());
}

System.out.println( s +timestamps +"\n\n");
}
}
I get for :
stream
.filter(new FilterFunction<Car>() {
@Override
public boolean filter(Car car) throws Exception {
return car.getId().equals("car.330");
}
})
.keyBy("id")
.map(new BearingMap())
.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new TimeWindowTest());
So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1 
will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the 
start of the window.
Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]

Best,
Nico
 

2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144



Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

Aljoscha Krettek
Now I see. What you're doing in this example is basically reassigning timestamps to other elements in your stateful MapFunction. Flink internally keeps track of the timestamp of an element. This can normally not be changed, except by using a TimestampAssigner, which you're doing. Now, the output from a MapFunction has the same timestamp as the input element. By keeping an element in state and emitting it when the next element arrives you emit it with the timestamp of that next element and that's the reason why the end up in the "wrong" windows.

Does that help?

-
Aljoscha 

On Thu, 26 Jan 2017 at 19:17 Nico <[hidden email]> wrote:
Hi,

can anyone help me with this problem? I don't get it. Forget the examples below, I've created a copy / paste example to reproduce the problem of incorrect results when using key-value state und windowOperator.


public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple2<String,Long>> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)
);

stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
return stringLongTuple2.f1;
}
})
.keyBy("f0")
.map(new MapTest())
.keyBy("f0")
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception {

Set<Long> set = new HashSet<>();
for(Tuple2<String,Long> t : iterable){
set.add(t.f1);
}

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] ");
sb.append("Set " +set.toString());
System.out.println(sb.toString());
}
})
.print();


// execute program
env.execute("Flink Streaming Java API Skeleton");
}

private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {

private transient ValueState<Tuple2<String, Long>> state;

@Override
public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) throws Exception {

Tuple2<String,Long> t = state.value();

state.update(stringLongTuple2);

if(t == null) return stringLongTuple2;

return t;
}

@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
"lastEvent",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
null
);

state = getRuntimeContext().getState(vsd);
}
}
}


Output:

Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, 1485446266012]
Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, 1485446286049, 1485446276040]
Window [1485446300000 1485446320000] Set [1485446296066]

Best,
Nico

BTW ... I am using Flink 1.1.3.


2017-01-16 12:18 GMT+01:00 Nico <[hidden email]>:
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map function using the ValueState. But first, the assignTimestampsAndWatermarks() is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
private static class BearingMap extends RichMapFunction<Car, Car> {

private transient ValueState<Car> state;
private final double maxdiff = 12; // in Sekunden

@Override
public Car map(Car destination) throws Exception {

Car origin = state.value();
double olat, olon, dlat, dlon;

/**
* Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
*/
if (origin == null){
state.update(destination);
// gebe Car ohne Aenderungen zurueck
return destination;
}

double diff = origin.getTimestamp()-destination.getTimestamp();

System.out.println("Differenz: " +diff);

if(Math.abs(diff) <= maxdiff*1000){

/*
* Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
*/
if(diff > 0){
Car tmp = destination;
destination = origin;
origin = tmp;
}

/*
* Car tmp ist immer der Ursprung
*/

double bearing = Helper.calculateBearing(
origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

// Update des State's
state.update(destination);

origin.setDirection(bearing);
return origin;

}

// Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
return origin;

}


@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
"lastEvent",
Car.class,
null
);

state = getRuntimeContext().getState(vsd);
}
} 
Together with the window function:

private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
Set<Long> timestamps = new HashSet<Long>();

for( Car c : iterable){
timestamps.add(c.getTimestamp());
}

System.out.println( s +timestamps +"\n\n");
}
}
I get for :
stream
.filter(new FilterFunction<Car>() {
@Override
public boolean filter(Car car) throws Exception {
return car.getId().equals("car.330");
}
})
.keyBy("id")
.map(new BearingMap())
.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new TimeWindowTest());
So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1 
will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the 
start of the window.
Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]

Best,
Nico
 

2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144



Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

nico-2
Hi Aljoscha,

got it!!! :) Thank you. So, in order to retain the "original" timestamps, it would be necessary to assign the timestemps after the MapFunction instead of the kafka source? At lest, this solves the issue in the example.

Best,
Nico

2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Now I see. What you're doing in this example is basically reassigning timestamps to other elements in your stateful MapFunction. Flink internally keeps track of the timestamp of an element. This can normally not be changed, except by using a TimestampAssigner, which you're doing. Now, the output from a MapFunction has the same timestamp as the input element. By keeping an element in state and emitting it when the next element arrives you emit it with the timestamp of that next element and that's the reason why the end up in the "wrong" windows.

Does that help?

-
Aljoscha 

On Thu, 26 Jan 2017 at 19:17 Nico <[hidden email]> wrote:
Hi,

can anyone help me with this problem? I don't get it. Forget the examples below, I've created a copy / paste example to reproduce the problem of incorrect results when using key-value state und windowOperator.


public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple2<String,Long>> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)
);

stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
return stringLongTuple2.f1;
}
})
.keyBy("f0")
.map(new MapTest())
.keyBy("f0")
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception {

Set<Long> set = new HashSet<>();
for(Tuple2<String,Long> t : iterable){
set.add(t.f1);
}

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] ");
sb.append("Set " +set.toString());
System.out.println(sb.toString());
}
})
.print();


// execute program
env.execute("Flink Streaming Java API Skeleton");
}

private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {

private transient ValueState<Tuple2<String, Long>> state;

@Override
public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) throws Exception {

Tuple2<String,Long> t = state.value();

state.update(stringLongTuple2);

if(t == null) return stringLongTuple2;

return t;
}

@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
"lastEvent",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
null
);

state = getRuntimeContext().getState(vsd);
}
}
}


Output:

Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, 1485446266012]
Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, 1485446286049, 1485446276040]
Window [1485446300000 1485446320000] Set [1485446296066]

Best,
Nico

BTW ... I am using Flink 1.1.3.


2017-01-16 12:18 GMT+01:00 Nico <[hidden email]>:
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map function using the ValueState. But first, the assignTimestampsAndWatermarks() is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
private static class BearingMap extends RichMapFunction<Car, Car> {

private transient ValueState<Car> state;
private final double maxdiff = 12; // in Sekunden

@Override
public Car map(Car destination) throws Exception {

Car origin = state.value();
double olat, olon, dlat, dlon;

/**
* Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
*/
if (origin == null){
state.update(destination);
// gebe Car ohne Aenderungen zurueck
return destination;
}

double diff = origin.getTimestamp()-destination.getTimestamp();

System.out.println("Differenz: " +diff);

if(Math.abs(diff) <= maxdiff*1000){

/*
* Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
*/
if(diff > 0){
Car tmp = destination;
destination = origin;
origin = tmp;
}

/*
* Car tmp ist immer der Ursprung
*/

double bearing = Helper.calculateBearing(
origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

// Update des State's
state.update(destination);

origin.setDirection(bearing);
return origin;

}

// Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
return origin;

}


@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
"lastEvent",
Car.class,
null
);

state = getRuntimeContext().getState(vsd);
}
} 
Together with the window function:

private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
Set<Long> timestamps = new HashSet<Long>();

for( Car c : iterable){
timestamps.add(c.getTimestamp());
}

System.out.println( s +timestamps +"\n\n");
}
}
I get for :
stream
.filter(new FilterFunction<Car>() {
@Override
public boolean filter(Car car) throws Exception {
return car.getId().equals("car.330");
}
})
.keyBy("id")
.map(new BearingMap())
.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new TimeWindowTest());
So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1 
will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the 
start of the window.
Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]

Best,
Nico
 

2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144




Reply | Threaded
Open this post in threaded view
|

Re: Events are assigned to wrong window

Aljoscha Krettek
Yes, that's true.

On Fri, 27 Jan 2017 at 13:16 Nico <[hidden email]> wrote:
Hi Aljoscha,

got it!!! :) Thank you. So, in order to retain the "original" timestamps, it would be necessary to assign the timestemps after the MapFunction instead of the kafka source? At lest, this solves the issue in the example.

Best,
Nico

2017-01-27 11:49 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Now I see. What you're doing in this example is basically reassigning timestamps to other elements in your stateful MapFunction. Flink internally keeps track of the timestamp of an element. This can normally not be changed, except by using a TimestampAssigner, which you're doing. Now, the output from a MapFunction has the same timestamp as the input element. By keeping an element in state and emitting it when the next element arrives you emit it with the timestamp of that next element and that's the reason why the end up in the "wrong" windows.

Does that help?

-
Aljoscha 

On Thu, 26 Jan 2017 at 19:17 Nico <[hidden email]> wrote:
Hi,

can anyone help me with this problem? I don't get it. Forget the examples below, I've created a copy / paste example to reproduce the problem of incorrect results when using key-value state und windowOperator.


public class StreamingJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Tuple2<String,Long>> stream = env.fromElements(
new Tuple2<>("1",1485446260994L),
new Tuple2<>("1",1485446266012L),
new Tuple2<>("1",1485446271031L),
new Tuple2<>("1",1485446276040L),
new Tuple2<>("1",1485446281045L),
new Tuple2<>("1",1485446286049L),
new Tuple2<>("1",1485446291062L),
new Tuple2<>("1",1485446296066L),
new Tuple2<>("1",1485446302019L)
);

stream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
return stringLongTuple2.f1;
}
})
.keyBy("f0")
.map(new MapTest())
.keyBy("f0")
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception {

Set<Long> set = new HashSet<>();
for(Tuple2<String,Long> t : iterable){
set.add(t.f1);
}

StringBuilder sb = new StringBuilder();

sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"] ");
sb.append("Set " +set.toString());
System.out.println(sb.toString());
}
})
.print();


// execute program
env.execute("Flink Streaming Java API Skeleton");
}

private static class MapTest extends RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {

private transient ValueState<Tuple2<String, Long>> state;

@Override
public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2) throws Exception {

Tuple2<String,Long> t = state.value();

state.update(stringLongTuple2);

if(t == null) return stringLongTuple2;

return t;
}

@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
"lastEvent",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
null
);

state = getRuntimeContext().getState(vsd);
}
}
}


Output:

Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994, 1485446266012]
Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045, 1485446286049, 1485446276040]
Window [1485446300000 1485446320000] Set [1485446296066]

Best,
Nico

BTW ... I am using Flink 1.1.3.


2017-01-16 12:18 GMT+01:00 Nico <[hidden email]>:
Hi Aljoscha,

is was able to identify the root cause of the problem. It is my first map function using the ValueState. But first, the assignTimestampsAndWatermarks() is called after the connector to Kafka is generated:

FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);

// Extrahieren der Timestamps mit max. Delay von 2s
carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new TimestampGenerator(Time.seconds(0)));
In the map function I try to calculate the direction between two GPS data points. For this, I store the last event in ValueState. The function looks like this:
private static class BearingMap extends RichMapFunction<Car, Car> {

private transient ValueState<Car> state;
private final double maxdiff = 12; // in Sekunden

@Override
public Car map(Car destination) throws Exception {

Car origin = state.value();
double olat, olon, dlat, dlon;

/**
* Wenn State leer, berechne keine Richtung, sondern speichere Event nur in State
*/
if (origin == null){
state.update(destination);
// gebe Car ohne Aenderungen zurueck
return destination;
}

double diff = origin.getTimestamp()-destination.getTimestamp();

System.out.println("Differenz: " +diff);

if(Math.abs(diff) <= maxdiff*1000){

/*
* Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
*/
if(diff > 0){
Car tmp = destination;
destination = origin;
origin = tmp;
}

/*
* Car tmp ist immer der Ursprung
*/

double bearing = Helper.calculateBearing(
origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());

// Update des State's
state.update(destination);

origin.setDirection(bearing);
return origin;

}

// Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne Richtung zurück
return origin;

}


@Override
public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
"lastEvent",
Car.class,
null
);

state = getRuntimeContext().getState(vsd);
}
} 
Together with the window function:

private static class TimeWindowTest implements WindowFunction<Car, Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, List<String>>> collector) throws Exception {
String s = "Zeitfenster: " +timeWindow.getStart() +" - " + timeWindow.getEnd() +"\n";
Set<Long> timestamps = new HashSet<Long>();

for( Car c : iterable){
timestamps.add(c.getTimestamp());
}

System.out.println( s +timestamps +"\n\n");
}
}
I get for :
stream
.filter(new FilterFunction<Car>() {
@Override
public boolean filter(Car car) throws Exception {
return car.getId().equals("car.330");
}
})
.keyBy("id")
.map(new BearingMap())
.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new TimeWindowTest());
So actually, when an event e1 arrives the Map Operator, it is stored in ValueState and after the next element e2 arrives, e1 
will be forwarded. This is after 5 seconds. This generates the following outcome. One Element is always around 5 seconds before the 
start of the window.
Differenz: -5013.0
Differenz: -5014.0
Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
[1484564686236, 1484564691260]


Differenz: -5009.0
Differenz: -5007.0
Zeitfenster: 1484564700000 - 1484564710000
[1484564696273, 1484564701287]


Differenz: -5005.0
Differenz: -5014.0
Zeitfenster: 1484564710000 - 1484564720000
[1484564706296, 1484564711303]

Best,
Nico
 

2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
I'm assuming you also have the call to assignTimestampsAndWatermarks() somewhere in there as well, as in:

stream
      .assignTimestampsAndWatermarks(new TimestampGenerator()) // or somewhere else in the pipeline
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

Just checking, to make sure. If you have this we might have to dig a little deeper. Could you also please trying to bring the whole output of your apply() method in one go, i.e. collect all the output in a String and then have one call to System.out.println(), it could be that the output in the terminal is not completely in order.

Cheers,
Aljoscha

On Mon, 2 Jan 2017 at 15:04 Nico <[hidden email]> wrote:
Hi Aljoscha,

thank you for having a look. Actually there is not too much code based on timestamps:

stream
      .keyBy("id")
      .map(...)
      .filter(...)
      .map(...)
      .keyBy("areaID")
      .map(new KeyExtractor())
      .keyBy("f1.areaID","f0.sinterval")
      .window(TumblingEventTimeWindows.of(Time.seconds(20)))
      .apply(new TrafficInformation());

The map functions only enrich the data and don't change anything related to the timestamp.

the apply function is:

@Override
public void apply(
Tuple key,
TimeWindow timeWindow,
Iterable<Tuple2<DirectionInterval, Car>> cars,
Collector<Tuple3<String, Double, Double>> out) throws Exception {

System.out.println("Start: " +timeWindow.getStart());
System.out.println("End: " +timeWindow.getEnd());

for(Tuple2<DirectionInterval, Car> t : cars){
System.out.println(t.f1);
}

System.out.println(t.f1) prints all information about a car, in which the timestep is embedded. The System gets the timestamp with the class:

public class TimestampGenerator extends BoundedOutOfOrdernessTimestampExtractor <Car> {


    public TimestampGenerator(Time maxOutOfOrderness){
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Car car) {
        return car.getTimestamp();
    }


Example output is presented in the previous post... it looks like the timestamp is rounded... I am confused :-/

Best,
Nico

2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi,
could you please share code (and example data) for producing this output. I'd like to have a look.

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 16:29 Nico <[hidden email]> wrote:
Hi @all,

I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing. During this I found a strange behavior (at least for me) in the assignment of events. 

The first element of a new window is actually always part of the old window. I thought the events are late, but then they they would be dropped instead of assigned to the new window. Even with a allowedLateness of 10s the behavior remains the same. 

The used timeWindow.getStart() and getEnd in order to get the boundaries of the window.

Can someone explain this? 

Best,
Nico


TimeWindows with Elements: 

Start: 1482332940000 - End: 1482332960000
timestamp=1482332952907

Start: 1482332960000 - End: 1482332980000
timestamp=1482332958929
timestamp=1482332963995
timestamp=1482332969027
timestamp=1482332974039

Start: 1482332980000 - End: 1482333000000
timestamp=1482332979059
timestamp=1482332984072
timestamp=1482332989081
timestamp=1482332994089

Start: 1482333000000 - End: 1482333020000
timestamp=1482332999113
timestamp=1482333004123
timestamp=1482333009132
timestamp=1482333014144