Hello Fabian/Matthius,
Many thanks for showing interest in my query on SOF. That helps me sustain my enthusiasm. :-) After setting parallelism of environment to '1' and replacing _max()_ with _maxBy()_, I get a list of maximum temperatures but I fail to explain to myself, how does Flink arrive at those figures (attached below). I understand that different runs will possibly generate different results, because I am using **ProcessingTime** characteristic. Yet, I expect some kind of a deterministic output which I don't see. Please prod me to the right direction. Here's the code I have been referring to: ------------------------------------------------- case class IncomingDataUnit ( sensorUUID: String, radiationLevel: Int,photoSensor: Float, humidity: Float,timeStamp: Long, ambientTemperature: Float) extends Serializable { } object SocketTextStreamWordCount { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setParallelism(1) val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .map(e => (e.sensorUUID,e.ambientTemperature)) .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)) .trigger(CountTrigger.of(5)) .evictor(CountEvictor.of(4)) .maxBy(1) readings.print env.execute("Scala IOT Stream experiment Example") } private def readIncomingReadings(env: StreamExecutionEnvironment,inputPath: String) : DataStream[IncomingDataUnit] = { env.readTextFile(inputPath).map(datum => { val fields = datum.split(",") IncomingDataUnit( fields(0), // sensorUUID fields(1).toInt, // radiationLevel fields(2).toFloat, // photoSensor fields(3).toFloat, // humidity fields(4).toLong, // timeStamp fields(5).toFloat // ambientTemperature ) }) } } ------------------------------------------------- Here's the dataset: ------------------------------------------------ probe-f076c2b0,201,842.53,75.5372,1448028160,29.37 probe-dccefede,199,749.25,78.6057,1448028160,27.46 probe-f29f9662,199,821.81,81.7831,1448028160,22.35 probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98 probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02 probe-4d78b545,204,778.42,78.412,1448028160,25.92 probe-400c5cdf,204,711.65,73.585,1448028160,22.18 probe-df2d4cad,199,820.8,72.936,1448028161,16.18 probe-f4ef109e,199,785.68,77.5647,1448028161,16.36 probe-3fac3350,200,720.12,78.2073,1448028161,19.19 probe-42a9ddca,193,819.12,74.3712,1448028161,22.07 probe-252a5bbd,197,710.32,80.6072,1448028161,14.64 probe-987f2cb6,200,750.4,76.0533,1448028161,14.72 probe-24444323,197,816.06,84.0816,1448028161,4.405 probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43 probe-20c609fb,204,804.37,84.5243,1448028161,22.87 probe-c027fdc9,195,858.61,81.7682,1448028161,24.47 probe-2c6cd3de,198,826.96,85.26,1448028162,18.99 probe-960906ca,197,797.63,77.4359,1448028162,27.62 ------------------------------------------------- And here's the output: --------------------- (probe-6c75cfbe,30.02) (probe-42a9ddca,22.07) (probe-960906ca,27.62) (probe-400c5cdf,22.18) (probe-f076c2b0,29.37) (probe-6c75cfbe,30.02) (probe-960906ca,27.62) --------------------- -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya, can you describe the semantics that you want to implement?Cheers, Fabian 2015-11-27 14:49 GMT+01:00 Nirmalya Sengupta <[hidden email]>:
|
In reply to this post by nsengupta
Hello Fabian,
A little long mail; please have some patience. From your response: ' Let's start by telling me what you actually want to do ;-) ' At a broad level, I want to write a (series of, perhaps) tutorial of Flink, where these concepts are brought out by a mix of definition, elaboration, illustration and of course, code snippets. If that helps the community, I will be very happy. In the least, I will understand the principles and their application, much better. So, I am a bit selfish here perhaps. You also mention that you are preparing some such material. If I can complement your effort, I will be delighted. One never knows: going further, I may become a trainer / evangelist of Apache Flink, if I show enough grasp of the subject! :-) Now to this particular question (from SOF): When I began, my intention was to find maximum temperature, every 5 successive records (to answer your question). As I have said before, I am learning and hence, trying with various operator combinations on the same set of data to see what happens and then, trying to explain why that happens. Let's refer to the code again: val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .map(e => (e.sensorUUID,e.ambientTemperature)) .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)) .trigger(CountTrigger.of(5)) .evictor(CountEvictor.of(4)) .maxBy(1) So, what I understand is this: timeWindowAll defines a pane of 5 msecs. When this time expires, the timeWindowAll fires a kind of *onExpirationOf* trigger (I have fabricated the name, quite likely it doesn't exist). This perhaps does nothing other than passing to the function (here, maxBy() ) the contents of the window (whatever number of elements have been collected in last 5 msecs) and clearing the pane, readying it for the next 5 msecs (not exactly, but more of it later). However, I provide a CountTrigger (5 elements). According to the rules of Flink, this trigger replaces the aforementioned default onExpirationOf trigger. Therefore, when timeWindowAll is ready after 5 msecs have passed, what it finds available to fire is this CountTrigger. However, a CountTrigger can fire only if its count-related (here, 5) criterion is satisfied. So, after 5 msecs have passed, if the number of elements collected in timeWindowAll pane is >= 5, only then CountTrigger will fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug its shoulders and go back to wait for the next 5 msecs period. Going further, I provide a CountEvictor. According to the rules of Flink, an Evictor is allowed to act only when its associated trigger (here, CountTrigger) is fired. Because of its presence, a further check is made on the contents of the pane. If CountTigger is fired, the number of elements collected in the pane must be >= 5. However, the evictor is interested only in first 4 of them. The evictor takes away these 4 from timeWindowAll's pane and gives them to the function. The 5th element still remains in the pane. timeWindowAll readies itself for next 5 msecs, but its pane is not empty this time. It still has that solitary element there. This much seems straightforward but there is a twist in the tale. A very important point about timeWindowAll's pane is its ephemeral nature. When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)), Flink understands this as an instruction to create a pane every successive 5 msecs period. Flink doesn't create one pane which is cleared after every 5 msecs (this is what I inexactly mentioned earlier), and readied for the next 5 msecs. Instead, it brings into existence a freshly minted pane, every 5 msecs. The contents of the preceding pane are subjected to trigger (if it exists) and evictor (if it exists) and finally, function (if it is provided). Then, Flink dumps the preceding pane along with its contents, if any and readies the new pane, awaiting elements during the next 5 msecs. In my case above, the criteria of timeWindowAll and Trigger/Evictor are not locked in step as precisely as they should have been. It is quite possible that while CountTrigger fires because 5 elements are already in the pane, 5 msecs are yet to lapse. So, the current pane of timeWindowAll is still alive and is collecting subsequent elements arriving. The evictor takes away 4 elements. The remaining element is joined by a few more before 5 msecs lapse. After 5 msecs have lapsed, Flink extirpates the pane - along with its current contents - and creates a fresh pane. In effect, some elements which arrive and are collected in the pane never reach the trigger/evictor pair. These unfortunate elements are destroyed along with the pane in which they reside. Obviously, this affects the output that I see. The calculation of maximum temperature is inaccurate simply because some of the temperature readings are never available to the _maxBy_ function. Have I got it almost correct? Will be keen to hear from you. -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta5 "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya, thanks for the detailed description of your understanding of Flink's window semantics. Most of it is correct, but a few things need a bit of correction ;-) Please see my comments inline. 2015-11-28 4:36 GMT+01:00 Nirmalya Sengupta <[hidden email]>:
That sounds great! We are almost done with the blog post and will publish it soon. Looking forward to your feedback :-)
This is correct. timeWindowAll(5 msecs) (without additional trigger definitions) will create a new window every 5 msec, trigger after 5 msecs (call the user function), purge the window, and create a new window. Independent of the trigger, when a window expires, it is removed (including all elements it contains) and a new window is created.
If you define a CountTrigger(5), it will triggered exactly once when exactly 5 elements are in the window. Even if there are 2mecs for the window left. This will also replace the current trigger, that would trigger at 5 msecs, i.e., the window is only evaluated once after the 5th element was inserted. It depends on the trigger, what happens with the elements in the pane after the function has been called. If you look at the Trigger interface, you'll find that TriggerResult might be FIRE or FIRE_AND_PURGE (among others). FIRE will call the user function and leave the elements in the window. FIRE_AND_PURGE will call the user function, purge (delete) the window, and create a new window within the same time bounds.
When the CountTrigger(5) fires, exactly 5 elements are in the window pane. A CountEvictor(4) will remove the first element from the pane such that only 4 elements remain, before it calls the user function to evaluate the window. It depends on the TriggerResult, what happens with the four elements after the user function was invoked. A CountTrigger keeps the elements in the window.
This is correct.
In fact there is a quite easy solution for your issue, you should not use a time window but a count window instead: val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .map(e => (e.sensorUUID,e.ambientTemperature))
.maxBy(1) This will give you a tumbling count window, that calls maxBy() when ever 5 elements arrived. See the documentation for details [1] Please let me know if you have further questions, Fabian
|
Sorry, I have to correct myself. The windowing semantics are not easy ;-)
2015-11-30 15:34 GMT+01:00 Fabian Hueske <[hidden email]>:
THIS IS NOT CORRECT --> "Independent
of the trigger, when a window expires, it is removed (including all
elements it contains) and a new window is created." In fact, a window is only removed if a trigger returns FIRE_AND_PURGE or PURGE. The default time windows (without additional Trigger) purge their content at their end time. If you apply a trigger that does *not* purge the content of the window after it expires, it will consume memory forever.
|
In reply to this post by nsengupta
Hello Fabian,
Thanks for going through my long mail and concise responses. I am just happy that I was not way off the mark in my understanding. It seems to me that I would rather wait for your blog before asking more questions. Not sure, if I will left with enough drive to write my (planned) blogs, once yours is out. :-) Yes, your solution works and as luck would have it, I figured out the same during the weekend, after going through your earlier responses. Thanks. One question though: You mentioned - ' It depends on the TriggerResult, what happens with the four elements after the user function was invoked. A CountTrigger keeps the elements in the window. ' Could you elaborate this point a bit? If CountTrigger _keeps_ the elements in the window, who _removes_ them? Are the elements removed by the Trigger's FIRE_AND_PURGE directive or by Flink runtime, when the current pane is destroyed by the runtime before a new pane is created? -- Nirmalya - Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya, please don't feel discouraged to write blog posts! There are many things you could write about Flink's support for windows, e.g., you could discuss use cases / applications that require advanced window semantics.Regarding my comment on how/when elements are removed from a window: Elements are only removed if the Trigger returns a PURGE (or FIRE_AND_PURGE) TriggerResult, i.e., the runtime will not remove elements unless the trigger tells it to purge. If you look at the implementation of the CountTrigger, you'll see that it only returns CONTINUE and FIRE. Hence, elements are *never* removed from the window. This is actually, quite dangerous and will eventually lead to an OutOfMemoryError because the window data will be kept forever in memory. Best, Fabian 2015-11-30 18:59 GMT+01:00 Nirmalya Sengupta <[hidden email]>:
|
In reply to this post by nsengupta
Hello Fabian (<[hidden email]>),
Many thanks for your encouraging words about the blogs. I want to make a sincere attempt. To summarise my understanding of the rule of removal of the elements from the window (after going through your last mail), here are two corollaries: 1) If my workflow has no triggers (and hence, no evictors), my application will run out of memory, perhaps sooner than I expect. 2) If I am using CountTriggers only (and no Evictors), then I too my application will run out of memory, eventually. Could you please strike them with Yes/No? I understand why removal of elements from the window is essential. In fact if older elements are not removed, new elements cannot come in and therefore, the conceptual proposition of a **flowing Stream** is not realized in the right manner. -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya, please find my answers in line.2015-12-02 3:26 GMT+01:00 Nirmalya Sengupta <[hidden email]>:
No, not necessarily. If you define a regular TimeWindow or CountWindow, Flink will use default triggers and evictors to ensure that data is not unnecessarily accumulated.
Yes, that's right. However, you can wrap the CountTrigger in a PurgingTrigger to purge windows after they have been evaluated.
Custom triggers and evictors should only be used, if you have special windowing semantics that cannot be addressed by Flink's build-in window strategies. As soon as you call .tringger() (and .evictor) on a window, you have to be *very* careful and knowledgeable about the internals to get these things right. The docs is currently insufficient for these situations but we are working on improving them.
|
Free forum by Nabble | Edit this page |