Hello Fabian <[hidden email]>
Merry Christmas to you and everyone else in this forum. Another neophyte's question, patience please. I have following code: val env = StreamExecutionEnvironment.createLocalEnvironment(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val readings = readIncomingReadings(env,"./sampleIOT.csv") .map(e => (e.timeStamp,e.ambientTemperature)) .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1) .timeWindowAll(Time.milliseconds(3)) .maxBy(1) In the datafile, timestamps are 2nd from the right field (first few records only): probe-42a9ddca,193,819.12,74.3712,1448028161,22.07 probe-252a5bbd,197,710.32,80.6072,1448028161,14.64 probe-987f2cb6,200,750.4,76.0533,1448028161,14.72 probe-24444323,197,816.06,84.0816,1448028161,4.405 probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43 probe-20c609fb,204,804.37,84.5243,1448028161,22.87 probe-c027fdc9,195,858.61,81.7682,1448028161,24.47 probe-2c6cd3de,198,826.96,85.26,1448028162,18.99 probe-960906ca,197,797.63,77.4359,1448028162,27.62 probe-16226f9e,199,835.5,81.2027,1448028162,18.82 probe-4de4e64b,200,851.04,80.5296,1448028162,27.43 ....... The output is: (1448028163,27.83) (1448028166,32.06) (1448028160,30.02) The contents are correct, but I am not sure about the order in which they appear. Because I am using val env = StreamExecutionEnvironment.createLocalEnvironment(1) // only one thread anyway and the timestamps are guaranteed to be in the ascending order (I have sorted the CSV before using it), my expectation is that the Flink should print the output as: (1448028160,30.02) (1448028163,27.83) (1448028166,32.06) How do I explain the randomness? -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya, event time events (such as an event time trigger to compute a window) are triggered when a watermark is received that is larger than the triggers timestamp. By default, watermarks are emitted with a fixed time interval, i.e., every x milliseconds. When a new watermark is emitted, Flink asks for the currently valid watermark value. If a window operator receives a watermark that closes multiple windows, the order in which the windows are computed is random. In your case, you are reading data from a file which is very fast and several windows are completed when the first watermark is received. The order in which these windows are computed an their events returned is random. You can configure the watermark interval with ExecutionConfig.setAutoWatermarkInterval(long milliseconds)). Alternatively, you can implement a source function that emits watermarks by itself. Best, Fabian 2015-12-26 18:01 GMT+01:00 Nirmalya Sengupta <[hidden email]>:
|
Free forum by Nabble | Edit this page |