Hello Fabian (and others),
Sorry to bring up the same flogged topic of CountWindowAll() but I just want to be sure that I understand it right. For a dataset like the following (partial): ----------------------------------------- probe-f076c2b0,201,842.53,75.5372,1448028160,29.37 probe-dccefede,199,749.25,78.6057,1448028160,27.46 probe-f29f9662,199,821.81,81.7831,1448028160,22.35 probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98 probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02 probe-4d78b545,204,778.42,78.412,1448028160,25.92 probe-400c5cdf,204,711.65,73.585,1448028160,27.18 ........... ----------------------------------------- The following code : ----------------------------------------- val env = StreamExecutionEnvironment.createLocalEnvironment(1) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setParallelism(1) val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .map(e => (e.sensorUUID,e.ambientTemperature)) .countWindowAll(4,1) .maxBy(1) readings.print ------------------------------------------- produces this (partial): ------------------------------------------ (probe-f076c2b0,29.37) (probe-f076c2b0,29.37) (probe-f076c2b0,29.37) (probe-f076c2b0,29.37) (probe-6c75cfbe,30.02) (probe-6c75cfbe,30.02) (probe-6c75cfbe,30.02) (probe-6c75cfbe,30.02) (probe-400c5cdf,27.18) ...... ------------------------------------------ I am trying to justify the first three lines of the output. When I call CountWindowAll(4,1), don't I instruct Flink that 'wait till you get at least first 4 readings before you calculate the maximum'? It appears that Flink is calculating max() for every incoming tuple it is adding to the window. What is the correct and complete interpretation of the computation then? -- N Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya,
when using count windows the window will trigger after “slide-size” elements have been received. So, since in your example, slide-size is set to 1 it will emit a new max for every element received and once it accumulated 4 elements it will start removing one element for every new element that arrives before computing the max. Cheers, Aljoscha > On 14 Dec 2015, at 02:55, Nirmalya Sengupta <[hidden email]> wrote: > > Hello Fabian (and others), > > Sorry to bring up the same flogged topic of CountWindowAll() but I just want to be sure that I understand it right. > > For a dataset like the following (partial): > > ----------------------------------------- > > probe-f076c2b0,201,842.53,75.5372,1448028160,29.37 > probe-dccefede,199,749.25,78.6057,1448028160,27.46 > probe-f29f9662,199,821.81,81.7831,1448028160,22.35 > probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98 > probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02 > probe-4d78b545,204,778.42,78.412,1448028160,25.92 > probe-400c5cdf,204,711.65,73.585,1448028160,27.18 > ........... > ----------------------------------------- > > The following code : > > ----------------------------------------- > val env = StreamExecutionEnvironment.createLocalEnvironment(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > env.setParallelism(1) > > val readings = > readIncomingReadings(env,"./sampleIOTTiny.csv") > .map(e => (e.sensorUUID,e.ambientTemperature)) > .countWindowAll(4,1) > .maxBy(1) > > > readings.print > ------------------------------------------- > > produces this (partial): > > ------------------------------------------ > (probe-f076c2b0,29.37) > (probe-f076c2b0,29.37) > (probe-f076c2b0,29.37) > (probe-f076c2b0,29.37) > (probe-6c75cfbe,30.02) > (probe-6c75cfbe,30.02) > (probe-6c75cfbe,30.02) > (probe-6c75cfbe,30.02) > (probe-400c5cdf,27.18) > ...... > ------------------------------------------ > > I am trying to justify the first three lines of the output. When I call CountWindowAll(4,1), don't I instruct Flink that 'wait till you get at least first 4 readings before you calculate the maximum'? It appears that Flink is calculating max() for every incoming tuple it is adding to the window. What is the correct and complete interpretation of the computation then? > > -- N > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is where they should be. > Now put the foundation under them." |
In reply to this post by nsengupta
Hello Aljoscha <[hidden email]>,
Thanks for the explanation about the semantics of CountWindowAll's parameters. However, I am thinking about it and what strikes me is this: If I call CountWindowAll(10,5) then what I am instructing Flink to do is to 1) Collect first 10 2) Call max() function and, *then* begin the slider and 3) take 1 out and 1 in so that count of elements in the window remains 10 (or less if fewer than 5 events appear) 4) Call max function with this (n -1 + 1) elements Isn't this the way that API is to be understood? What we observe instead, is that max function is called once after first 5 arrive, and once more after second 5 arrive. So, the first max misleads because my intention is to find max of 10 elements and never of 5 elements. Please make me wiser. -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi,
the current behavior is in fact that the window will be triggered every “slide-size” elements and the computation will take into account the last “window-size” elements. So for a window with window-size 10 and slide-size 5 the window will be triggered every 5 elements. This means that your observation is correct and the first window will only contain the first 5 elements, the next one will contain the first 10 elements and so on. Cheers, Aljoscha > On 14 Dec 2015, at 14:24, Nirmalya Sengupta <[hidden email]> wrote: > > Hello Aljoscha <[hidden email]>, > > > Thanks for the explanation about the semantics of CountWindowAll's parameters. > > However, I am thinking about it and what strikes me is this: > > If I call CountWindowAll(10,5) then what I am instructing Flink to do is to > > 1) Collect first 10 > 2) Call max() function > > and, *then* begin the slider and > > 3) take 1 out and 1 in so that count of elements in the window remains 10 (or less if fewer than 5 events appear) > 4) Call max function with this (n -1 + 1) elements > > Isn't this the way that API is to be understood? > > What we observe instead, is that max function is called once after first 5 arrive, and once more after second 5 arrive. So, the first max misleads because my intention is to find max of 10 elements and never of 5 elements. > > Please make me wiser. > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is where they should be. > Now put the foundation under them." |
In reply to this post by nsengupta
Hello Aljoscha <[hidden email]>,
Thanks again for taking time to explain the behaviour of CountWindowAll(m,n). To be honest, the behaviour seems a bit sketchy to me - and probably it needs a revisit - but if that's the way it is, then that's the way it is! :-) -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi,
I believe this question might have been asked before - so sorry for repeating it (I just did not find the discussion on the mailing list).
Is it possible somehow to create a new DataStream from the elements that are evicted from a window?
A simple use case for this is: We have data coming from a sensor every second. We want to compute all the times the average on the last 5 seconds and on the interval from 5 seconds ago until 10 seconds ago. I would be interested in how the data evicted from the main window that keeps the fresh data could be feed into a new stream on which I could apply again a window of 5 seconds. (Having a 10 seconds window and selecting only the most ancient 5 second data is not a viable option)
Regards, |
Hi,
I’m afraid this is not possible right now. I’m also not sure about the Evictors as a whole. Using them makes window operations very slow because all elements in a window have to be kept, i.e. window results cannot be pre-aggregated. Cheers, Aljoscha > On 15 Dec 2015, at 12:23, Radu Tudoran <[hidden email]> wrote: > > Hi, > > I believe this question might have been asked before - so sorry for repeating it (I just did not find the discussion on the mailing list). > > Is it possible somehow to create a new DataStream from the elements that are evicted from a window? > > A simple use case for this is: > We have data coming from a sensor every second. We want to compute all the times the average on the last 5 seconds and on the interval from 5 seconds ago until 10 seconds ago. > I would be interested in how the data evicted from the main window that keeps the fresh data could be feed into a new stream on which I could apply again a window of 5 seconds. > (Having a 10 seconds window and selecting only the most ancient 5 second data is not a viable option) > > Regards, |
Free forum by Nabble | Edit this page |