Hello Flinksters,
This is perhaps too trivial for most here in this forum, but I want to have my understanding clear. I want to find the average of temperatures coming in as a stream. The tuple as the following fields: probeID,readingTimeStamp,radiationLevel,photoSensor,humidity,ambientTemperature The 'readingTimeStamp' field provides the 'Event Time', and this provides the basis of the timeWindow(10) that I employ. I _keyBy_ the 'readingTimeStamp' and collect readings for next 10 seconds, and then I compute the average of the 'ambientTemperature'. After every 10 seconds, I want to be able to find the average temperature *so far* (a single value). Because my application defaults to parellelism == 4 (my laptop is 4 core), my understanding is that just by using a combination of an appropriate RichXXXFunction() and saving State in the RuntimeContext, I may not get the correct result. This is because, depending the way the Keys are distributed, 4 _different_ averages will be produced. Is this understanding right? If so, is using a *Broadcast variable*, the solution? Please help me plug the gap in understanding, if any. -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalaya, I hope I have been helpful. On Sat, Feb 13, 2016 at 7:03 PM, Nirmalya Sengupta <[hidden email]> wrote:
BR, Stefano Baghino |
In reply to this post by nsengupta
Hello Stefano <[hidden email]>
Many thanks for responding so quickly. Your explanation not only confirms my understanding but gives a much simpler solution. The facility of associating a specific parallelism to a given operator didn't strike me at all. You are right that for my particular UseCase, that is the simplest and perhaps the most correct solution. -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
In reply to this post by nsengupta
Hello Stefano <[hidden email]>
I have tried to implement what I understood from your mail earlier in the day, but it doesn't seem to produce the result I expect. Here's the code snippet: ------------------------------------------------------------------------- val env = StreamExecutionEnvironment.createLocalEnvironment(4) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val readings = readIncomingReadings(env,"./sampleIOTTiny.csv") .keyBy(_.readingTimeStamp) .countWindow(4,2) val avgReadings = readings .apply((key: Long,w: Window,v: Iterable[IncomingDataUnit],out: Collector[Float]) => { val readings : Iterable[Float] = v.map(_.ambientTemperature) val avg = readings.sum / readings.size out.collect(avg) }).setParallelism(1) avgReadings.print() ------------------------------------------------------------------------- And, here's the output: ------------------------------------------------------------------------- 1> 23.67 1> 21.0025 1> 23.79 2> 25.02 2> 23.3425 2> 25.02 3> 26.55 4> 19.970001 3> 18.93375 4> 25.727499 3> 18.93375 4> 25.7075 -------------------------------------------------------------------------- My understanding is that because I have associated a parallelism(1) to the avgReadings transformation, it should aggregate the streams from all the 4 earlier windows, and then compute the single average value. It is quite apparent that there is a gap in my understanding. Could you please point out the mistake that I am making? Many thanks in advance. -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hello again Nirmalaya, in this case you are keying by timestamp; think of keying as grouping: this means that windows are brought together according to their timestamp. I misread your original post but now that I see the code I understand your problem. I've put some code here: https://github.com/stefanobaghino/flink-stream-avg-example It should more or less be the same as your program. What I did was removing the keying. By not keying the stream, window operations (like countWindowAll instead of countWindow) act on the whole stream, thus forcing a parallelism of 1. I've put a couple of prints so that you can visualize what is the parallelism and what goes in each window. To get a better understanding of what was going on, you can put the same prints in your code as well (printing the key for each keyed window as well); you'll see that in your case windows are "grouped" by timestamp. Hope I've been a little bit more helpful than last time. :) On Sun, Feb 14, 2016 at 6:50 PM, Nirmalya Sengupta <[hidden email]> wrote:
BR, Stefano Baghino |
In reply to this post by nsengupta
Hello Stefano <[hidden email]>
Sorry for the late reply. Many thanks for taking effort to write and share an example code snippet. I have been playing with the countWindow behaviour for some weeks now and I am generally aware of the functionality of countWindowAll(). For my useCase, where I _have to observe_ the entire stream as it founts in, using countWindowAll() is probably the most obvious solution. This is what you recommend too. However, because this is going to use 1 thread only (or 1 node only in a cluster), I was thinking about ways to make use of the 'distributedness' of the framework. Hence, my question. Your reply leads to me read and think a bit more. If I have to use parallelism to achieve what I want to achieve, I think managing a ValueState of my own is possibly the solution. If you have any other thoughts, please share. From your earlier response: '... you can still enjoy a high level of parallelism up until the last operator by using a combiner, which is basically a reducer that operates locally ...'. Could you elaborate this a bit, whenever you have time? -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalaya, my reply was based on me misreading your original post, thinking you had a batch of data, not a stream. I see that the apply method can also take a reducer the pre-aggregates your data before passing it to the window function. I suspect that pre-aggregation runs locally just like a combiner would, but I'm really not sure about it. We should have more feedback on this regard. On Tue, Feb 16, 2016 at 2:19 AM, Nirmalya Sengupta <[hidden email]> wrote:
BR, Stefano Baghino |
Hi,
the name pre-aggregation is a bit misleading. I have started calling it incremental aggregation because it does not work like a combiner. What it does is to incrementally fold (or reduce) elements as they arrive at the window operator. This reduces the amount of required space, because, otherwise, all the elements would have to be stored before the window is triggered. When using an incremental fold (or reduce) the WindowFunction only get’s the one final result of the incremental aggregation. Cheers, Aljoscha > On 17 Feb 2016, at 09:27, Stefano Baghino <[hidden email]> wrote: > > Hi Nirmalaya, > > my reply was based on me misreading your original post, thinking you had a batch of data, not a stream. I see that the apply method can also take a reducer the pre-aggregates your data before passing it to the window function. I suspect that pre-aggregation runs locally just like a combiner would, but I'm really not sure about it. We should have more feedback on this regard. > > On Tue, Feb 16, 2016 at 2:19 AM, Nirmalya Sengupta <[hidden email]> wrote: > Hello Stefano <[hidden email]> > > Sorry for the late reply. Many thanks for taking effort to write and share an example code snippet. > > I have been playing with the countWindow behaviour for some weeks now and I am generally aware of the functionality of countWindowAll(). For my useCase, where I _have to observe_ the entire stream as it founts in, using countWindowAll() is probably the most obvious solution. This is what you recommend too. However, because this is going to use 1 thread only (or 1 node only in a cluster), I was thinking about ways to make use of the 'distributedness' of the framework. Hence, my question. > > Your reply leads to me read and think a bit more. If I have to use parallelism to achieve what I want to achieve, I think managing a ValueState of my own is possibly the solution. If you have any other thoughts, please share. > > From your earlier response: '... you can still enjoy a high level of parallelism up until the last operator by using a combiner, which is basically a reducer that operates locally ...'. Could you elaborate this a bit, whenever you have time? > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is where they should be. > Now put the foundation under them." > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit |
In reply to this post by nsengupta
Hello Aljoscha <[hidden email]>
Thanks very much for clarifying the role of Pre-Aggregation (rather, Incr-Aggregation, now that I understand the intention). It helps me to understand. Thanks to Setfano too, for keeping at the original question of mine. My current understanding is that if I have to compute the average of a streaming set of _temperatures_ then the *best* way to accomplish this, is by employing one node (or thread, on my laptop), losing speed but gaining deterministic behaviour in the process. I can decide to capture the average either by grouping the temperatures by count or by time. Because I am sliding the window anyway, I don't run the risk of accumulation of elements in the window and buffer overrun. Could you please confirm if my understanding is correct? I feel happy if I 'understand' the basis of a design well! :-) -- Nirmalya -- Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Thanks, Aljosha, for the explanation. Isn't there a way to apply the concept of the combiner to a streaming process? On Thu, Feb 18, 2016 at 3:56 AM, Nirmalya Sengupta <[hidden email]> wrote:
BR, Stefano Baghino |
@Nirmalya: Yes, this is right if you temperatures don’t have any other field on which you could partition them.
@Stefano: Under some circumstances it would be possible to use a a combiner (I’m using the name as Hadoop MapReduce would use it, here). When the assignment of elements to windows happens based on the timestamp in the elements and window triggering happens based on watermark it is possible to combine locally. The reason is that the elements will end up in the same windows regardless of the time at which the window is processed so it can be done in two steps. Does that make sense? It’s a very ad-hoc description and I could make up a drawing or something if that helped. :D > On 18 Feb 2016, at 10:04, Stefano Baghino <[hidden email]> wrote: > > Thanks, Aljosha, for the explanation. Isn't there a way to apply the concept of the combiner to a streaming process? > > On Thu, Feb 18, 2016 at 3:56 AM, Nirmalya Sengupta <[hidden email]> wrote: > Hello Aljoscha <[hidden email]> > > Thanks very much for clarifying the role of Pre-Aggregation (rather, Incr-Aggregation, now that I understand the intention). It helps me to understand. Thanks to Setfano too, for keeping at the original question of mine. > > My current understanding is that if I have to compute the average of a streaming set of _temperatures_ then the *best* way to accomplish this, is by employing one node (or thread, on my laptop), losing speed but gaining deterministic behaviour in the process. I can decide to capture the average either by grouping the temperatures by count or by time. Because I am sliding the window anyway, I don't run the risk of accumulation of elements in the window and buffer overrun. > > Could you please confirm if my understanding is correct? I feel happy if I 'understand' the basis of a design well! :-) > > -- Nirmalya > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is where they should be. > Now put the foundation under them." > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit |
I think combiners are pretty awesome for certain cases to minimize network usage (the average use case seems to fit perfectly), maybe it would be worthwhile adding a detailed description of the approach to the docs? On Thu, Feb 18, 2016 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote: @Nirmalya: Yes, this is right if you temperatures don’t have any other field on which you could partition them. BR, Stefano Baghino |
They would be awesome, but it’s not yet possible in Flink Streaming, I’m afraid.
> On 18 Feb 2016, at 10:59, Stefano Baghino <[hidden email]> wrote: > > I think combiners are pretty awesome for certain cases to minimize network usage (the average use case seems to fit perfectly), maybe it would be worthwhile adding a detailed description of the approach to the docs? > > On Thu, Feb 18, 2016 at 10:47 AM, Aljoscha Krettek <[hidden email]> wrote: > @Nirmalya: Yes, this is right if you temperatures don’t have any other field on which you could partition them. > > @Stefano: Under some circumstances it would be possible to use a a combiner (I’m using the name as Hadoop MapReduce would use it, here). When the assignment of elements to windows happens based on the timestamp in the elements and window triggering happens based on watermark it is possible to combine locally. The reason is that the elements will end up in the same windows regardless of the time at which the window is processed so it can be done in two steps. Does that make sense? It’s a very ad-hoc description and I could make up a drawing or something if that helped. :D > > > > On 18 Feb 2016, at 10:04, Stefano Baghino <[hidden email]> wrote: > > > > Thanks, Aljosha, for the explanation. Isn't there a way to apply the concept of the combiner to a streaming process? > > > > On Thu, Feb 18, 2016 at 3:56 AM, Nirmalya Sengupta <[hidden email]> wrote: > > Hello Aljoscha <[hidden email]> > > > > Thanks very much for clarifying the role of Pre-Aggregation (rather, Incr-Aggregation, now that I understand the intention). It helps me to understand. Thanks to Setfano too, for keeping at the original question of mine. > > > > My current understanding is that if I have to compute the average of a streaming set of _temperatures_ then the *best* way to accomplish this, is by employing one node (or thread, on my laptop), losing speed but gaining deterministic behaviour in the process. I can decide to capture the average either by grouping the temperatures by count or by time. Because I am sliding the window anyway, I don't run the risk of accumulation of elements in the window and buffer overrun. > > > > Could you please confirm if my understanding is correct? I feel happy if I 'understand' the basis of a design well! :-) > > > > -- Nirmalya > > -- > > Software Technologist > > http://www.linkedin.com/in/nirmalyasengupta > > "If you have built castles in the air, your work need not be lost. That is where they should be. > > Now put the foundation under them." > > > > > > > > -- > > BR, > > Stefano Baghino > > > > Software Engineer @ Radicalbit > > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit |
Combiners in streaming are a bit tricky, from their semantics: 1) Combiners always hold data back, through the preaggregation. That adds latency and also means the values are not in the actual windows immediately, where a trigger may expect them. 2) In batch, a combiner combines as long as there is input data, or as long as there is space in the buffer. In streaming you would need to define something like "combine, but do not hold back longer than 5 seconds", to control the latrncy impact. Holding data back for a limited time makes the combiner less effective (it combines fewer elements) I think these two points limit the benefit of combiners in streaming. There are cases where they may still help, but I think they are much fewer than in batch. Stephan On Thu, Feb 18, 2016 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote: They would be awesome, but it’s not yet possible in Flink Streaming, I’m afraid. |
In reply to this post by nsengupta
Hello Aljoscha <[hidden email]>,
You mentioned: '.. Yes, this is right if you temperatures don’t have any other field on which you could partition them. '. What I am failing to understand is that if temperatures are partitioned on some other field (in my use-case, I have one such: the temp_reading_timestamp), they will be pushed to different nodes (different threads in local run) based on that field. Because they will be computed (scattered) and later collected (gathered), how could I arrive at the _running_ average temperature? The client application needs to know *how the average temperature is changing over time'. Could you please fill in the gap in my understanding? -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi,
as I understand it the “temp_reading_timestamp” field is not a key on which you can partition your data. This is a field that would be used for assigning the elements to timestamps. In you data you also have the “probeID” field. This is a field that could be used to parallelize computation, for example you could do the following: val inputStream = <define some source> val result = inputStream .assignAscendingTimestamps { e => e.temp_reading_timestamp } .keyBy { e => e.probeID } .timeWindow(Time.minutes(10)) .apply(new SumFunction(), new ComputeAverageFunction()) result.print() (Where SumFunction() would sum up temperatures and keep a count and ComputeAverageFunction() would divide the sum by the count.) In this way, computation is parallelized because it can be spread across several machines and partitioned by the key. Without such a key everything has to be computed on one machine because a global view of the data is required. Cheers, Aljoscha > On 18 Feb 2016, at 17:54, Nirmalya Sengupta <[hidden email]> wrote: > > Hello Aljoscha <[hidden email]>, > > You mentioned: '.. Yes, this is right if you temperatures don’t have any other field on which you could partition them. '. > > What I am failing to understand is that if temperatures are partitioned on some other field (in my use-case, I have one such: the temp_reading_timestamp), they will be pushed to different nodes (different threads in local run) based on that field. Because they will be computed (scattered) and later collected (gathered), how could I arrive at the _running_ average temperature? The client application needs to know *how the average temperature is changing over time'. > > Could you please fill in the gap in my understanding? > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is where they should be. > Now put the foundation under them." |
In reply to this post by nsengupta
Hello Aljoscha <[hidden email]>, My sincere apologies at the beginning, if I seem to repeat the same question, almost interminably. If it is frustrating you, I seek your patience but I really want to nail it down in mind. :-) The point about parallelizing is well taken. I understand why the stream should be broken into multiple partitions and how. The understanding that is still evading me is how is the use-case of computing an (sliding) average temperature achieved if the stream is scattered. I want the *running* average temperature for every 3 readings, sliding by 1 reading. I am monitoring the average temperature; if it goes beyond a certain threshold for 3 consecutive readings, I throw an alarm. Let's take the following set of data (fields are: probeID, timestamp, temperature ; 'timestamp' field is used for assignAscendingTimestamp() function): P1,T1,20 P1,T2,30 P2,T2,30 P1,T3,50 P2,T3,20 P3,T3,10 Assumption: T1 < T2 < T3 Now, if we partition on the probeID, we get three partitions, thus: P1 -> (T1,20) | (T2,30) | (T3,50) P2 -> (T2,30) | (T3,20) P3 -> (T3,10) Computing the average temperature will give me *three distinct averages* here, one for each partition. I get average per probe, not per every 3 readings [assuming a slidingWindow(3,1)] irrespective of which probe gives. Is it even correct to expect a running average if we partition the input stream? Hope I am making my understanding (or the lack of it), quite clear here! :-) -- Nirmalya ------------------------------------------------------------------------------------------------------------------------------------------------------------- To: [hidden email] Cc: Date: Fri, 19 Feb 2016 10:41:52 +0100 Subject: Hi, as I understand it the “temp_reading_timestamp” field is not a key on which you can partition your data. This is a field that would be used for assigning the elements to timestamps. In you data you also have the “probeID” field. This is a field that could be used to parallelize computation, for example you could do the following: val inputStream = <define some source> val result = inputStream .assignAscendingTimestamps { e => e.temp_reading_timestamp } .keyBy { e => e.probeID } .timeWindow(Time.minutes(10)) .apply(new SumFunction(), new ComputeAverageFunction()) result.print() (Where SumFunction() would sum up temperatures and keep a count and ComputeAverageFunction() would divide the sum by the count.) In this way, computation is parallelized because it can be spread across several machines and partitioned by the key. Without such a key everything has to be computed on one machine because a global view of the data is required. Cheers, Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi Nirmalya, if you want to calculate the running average over all measurements independent of the probe ID, then you cannot parallelize the computation. In this case you have to use a global window. Cheers, On Feb 19, 2016 6:30 PM, "Nirmalya Sengupta" <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |