Hej,
I want to do the following thing: 1. Split a Stream of incoming Logs by host address. 2. For each Key, create time based windows 3. Count the number of items in the window 4. Feed it into a statistical model that is maintained for each host Since I don't have fields to sum upon, I use a (window) fold function to count the number of elements in the window. (Maybe there is a better way to do this, or it could be part of the primitives) My problem is now that I get back a DataStream so the distribution by key is lost. Is there a way to preserve the distribution by key? Currently I only store the count of element in the windows so I cannot simple do byKey again. I could fold into tuples that have the count and also contain the host address but that feels clumsy. Any hints are welcome. cheers Martin |
Hi,
where are you storing the results of each window computation to? Maybe you could also store it from inside a custom WindowFunction where you just count the elements and then store the results. On the other hand, adding a (1) field and doing a window reduce (à la WordCount) is going to be way more efficient because we only have to keep one element per window (the current reduced tuple) instead of all the tuples, as we have to for a fold or WindowFunction. If you want you can also combine a reduce and WindowFunction: WindowedStream.apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) here, the ReduceFunction does the WordCount-like counting while in the WindowFunction you get the final result and store it inside your model. Let me know if you need more information. Cheers, Aljoscha > On 03 Nov 2015, at 11:28, Martin Neumann <[hidden email]> wrote: > > Hej, > > I want to do the following thing: > 1. Split a Stream of incoming Logs by host address. > 2. For each Key, create time based windows > 3. Count the number of items in the window > 4. Feed it into a statistical model that is maintained for each host > > Since I don't have fields to sum upon, I use a (window) fold function to count the number of elements in the window. (Maybe there is a better way to do this, or it could be part of the primitives) > My problem is now that I get back a DataStream so the distribution by key is lost. Is there a way to preserve the distribution by key? Currently I only store the count of element in the windows so I cannot simple do byKey again. > > I could fold into tuples that have the count and also contain the host address but that feels clumsy. > > Any hints are welcome. > > > cheers Martin |
Here is some code of my current solution, if there is some better way of doing it let me know.
...
I wish I could get rid of that annoying Tuple2 after the fold and just have a KeyedStream<Integer,String> but I didn't find anything in the documentation that would allow me to do that. An other curious thing is that in the 2nd statement .keyBy(t -> t.f1) works but .keyBy(1) does not, even though they do the same thing. I'm using Idea at the moment so it can be just another type inference problem with that IDE. cheers Martin On Tue, Nov 3, 2015 at 3:06 PM, Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hi,
I’m afraid there is no other solution besides calling keyBy() again if you require a keyed data stream. This has to do with the data model of Flink, where the key is part of the regular data element. This is different from systems that have a (Key, Value) data model. Those systems can provide primitives that only work on the value and therefore they can preserve a key partitioning. Cheers, Aljoscha > On 03 Nov 2015, at 18:17, Martin Neumann <[hidden email]> wrote: > > Here is some code of my current solution, if there is some better way of doing it let me know. > > KeyedStream<DataPojo,String> hostStream = inStream > .keyBy(t -> t.getHost()); > KeyedStream<Tuple2<Integer,String>,String> freqStream = hostStreams > .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS)) > .fold(new Tuple2<Integer, String>(0, ""), new Count()) > .keyBy(t -> t.f1); > ... > > I wish I could get rid of that annoying Tuple2 after the fold and just have a KeyedStream<Integer,String> but I didn't find anything in the documentation that would allow me to do that. > An other curious thing is that in the 2nd statement .keyBy(t -> t.f1) works but .keyBy(1) does not, even though they do the same thing. I'm using Idea at the moment so it can be just another type inference problem with that IDE. > > cheers > Martin > > On Tue, Nov 3, 2015 at 3:06 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > where are you storing the results of each window computation to? Maybe you could also store it from inside a custom WindowFunction where you just count the elements and then store the results. > > On the other hand, adding a (1) field and doing a window reduce (à la WordCount) is going to be way more efficient because we only have to keep one element per window (the current reduced tuple) instead of all the tuples, as we have to for a fold or WindowFunction. If you want you can also combine a reduce and WindowFunction: > WindowedStream.apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) > > here, the ReduceFunction does the WordCount-like counting while in the WindowFunction you get the final result and store it inside your model. > > Let me know if you need more information. > > Cheers, > Aljoscha > > On 03 Nov 2015, at 11:28, Martin Neumann <[hidden email]> wrote: > > > > Hej, > > > > I want to do the following thing: > > 1. Split a Stream of incoming Logs by host address. > > 2. For each Key, create time based windows > > 3. Count the number of items in the window > > 4. Feed it into a statistical model that is maintained for each host > > > > Since I don't have fields to sum upon, I use a (window) fold function to count the number of elements in the window. (Maybe there is a better way to do this, or it could be part of the primitives) > > My problem is now that I get back a DataStream so the distribution by key is lost. Is there a way to preserve the distribution by key? Currently I only store the count of element in the windows so I cannot simple do byKey again. > > > > I could fold into tuples that have the count and also contain the host address but that feels clumsy. > > > > Any hints are welcome. > > > > > > cheers Martin > > |
Free forum by Nabble | Edit this page |