hi! I'm trying to
collect some metrics by key per window and emiting the full result at
the end of the window to kafka, I started with a simple count by key to
test it but my requirements are a little more complex than that.input .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.of(5, TimeUnit.SECONDS)) .apply(new HashMap<String, Integer>(), foldFunction, winFunction); Reading the source code I see: Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream. emphasis on " for each key
individually", the return type of apply is SingleOutputStreamOperator
which doesn't provide many operations to group the emited values. |
Hi, from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys. Does this describe the behavior that you're seeing? Cheers, Aljoscha On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <[hidden email]> wrote:
|
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <[hidden email]> wrote:
yes, it's the behaviour I'm seeing, I'm looking for a way to merge those HashMaps from the same window into a single one, I can't find how.
|
Hi, for this you would have to use a non-parallel window, i.e. something like stream.windowAll(<my window>).apply(...). This does not compute per key but has the drawback that computation does not happen in parallel. If you only use it to combine the pre-aggregated maps it could be OK, though. Cheers, Aljoscha On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <[hidden email]> wrote:
|
On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <[hidden email]> wrote:
hi, thanks for the tip, it works, I was aware of the non parallel nature of what I want to do, after seeing it work I tried this: input.flatMap(new LineSplitter()).keyBy(0) .timeWindow(Time.of(5, TimeUnit.SECONDS)) .apply(new HashMap<String, Integer>(), timeWindowFold, timeWindowMerge) .windowAll(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .apply(new HashMap<String, Integer>(), windowAllFold, windowAllMerge); and it seems to work, but it seems each timeWindowFold accumulates a single key, I was expecting to have one or more keys per fold depending on in which processing node the computation was being handled, I don't care if I emit one event per key, but I want to know if it's ok and if I'm missing something (maybe I have to add a line to specify partition?)
|
Hi, what's the number of unique keys and the parallelism of your job? If the former is larger than the latter you should indeed have one "timeWindowFold" be responsible for several keys. How are you determining whether one of these is only accumulating for a single key? Cheers, Aljoscha On Mon, 5 Sep 2016 at 17:35 Luis Mariano Guerra <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |