Explanation of the output of timeWindowAll(Time.milliseconds(3))

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Explanation of the output of timeWindowAll(Time.milliseconds(3))

nsengupta
Hello Fabian <[hidden email]>

Merry Christmas to you and everyone else in this forum.

Another neophyte's question, patience please.

I have following code:

    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
  
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val readings =
      readIncomingReadings(env,"./sampleIOT.csv")
      .map(e => (e.timeStamp,e.ambientTemperature))
      .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
      .timeWindowAll(Time.milliseconds(3))
      .maxBy(1)


In the datafile, timestamps are 2nd from the right field (first few records only):

probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
probe-16226f9e,199,835.5,81.2027,1448028162,18.82
probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
.......


The output is:

(1448028163,27.83)
(1448028166,32.06)
(1448028160,30.02)

The contents are correct, but I am not sure about the order in which they appear. Because I am using 

val env = StreamExecutionEnvironment.createLocalEnvironment(1)  // only one thread anyway


and the timestamps are guaranteed to be in the ascending order (I have sorted the CSV before using it), my expectation is that the Flink should print the output as:

(1448028160,30.02)

(1448028163,27.83)

(1448028166,32.06)

How do I explain the randomness?

-- Nirmalya

--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Explanation of the output of timeWindowAll(Time.milliseconds(3))

Fabian Hueske-2
Hi Nirmalya,

event time events (such as an event time trigger to compute a window) are triggered when a watermark is received that is larger than the triggers timestamp. By default, watermarks are emitted with a fixed time interval, i.e., every x milliseconds. When a new watermark is emitted, Flink asks for the currently valid watermark value. If a window operator receives a watermark that closes multiple windows, the order in which the windows are computed is random.

In your case, you are reading data from a file which is very fast and several windows are completed when the first watermark is received. The order in which these windows are computed an their events returned is random.

You can configure the watermark interval with ExecutionConfig.setAutoWatermarkInterval(long milliseconds)).
Alternatively, you can implement a source function that emits watermarks by itself.

Best,
Fabian



2015-12-26 18:01 GMT+01:00 Nirmalya Sengupta <[hidden email]>:
Hello Fabian <[hidden email]>

Merry Christmas to you and everyone else in this forum.

Another neophyte's question, patience please.

I have following code:

    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
  
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val readings =
      readIncomingReadings(env,"./sampleIOT.csv")
      .map(e => (e.timeStamp,e.ambientTemperature))
      .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
      .timeWindowAll(Time.milliseconds(3))
      .maxBy(1)


In the datafile, timestamps are 2nd from the right field (first few records only):

probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
probe-16226f9e,199,835.5,81.2027,1448028162,18.82
probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
.......


The output is:

(1448028163,27.83)
(1448028166,32.06)
(1448028160,30.02)

The contents are correct, but I am not sure about the order in which they appear. Because I am using 

val env = StreamExecutionEnvironment.createLocalEnvironment(1)  // only one thread anyway


and the timestamps are guaranteed to be in the ascending order (I have sorted the CSV before using it), my expectation is that the Flink should print the output as:

(1448028160,30.02)

(1448028163,27.83)

(1448028166,32.06)

How do I explain the randomness?

-- Nirmalya

--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."