emit a single Map<String, T> per window

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

emit a single Map<String, T> per window

Luis Mariano Guerra
hi!

I'm trying to collect some metrics by key per window and emiting the full result at the end of the window to kafka, I started with a simple count by key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the end of the window merge them together and emit a singe result, I don't want to accumulate all the events and calculate at the end of the window, from my understanding of fold in other languages/libraries, this would be what I need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction, winFunction);

where foldFunction accumulates by key and winFunction iterate over the hasmaps and merges them into a single result hashmap and emits that one at the end.

this emits many one-key hash maps instead of only one with all the keys, I tried setting setParallelism(1) in multiple places but still doesn't work. More confusingly, in one run it emited a single map but after I ran it again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

emphasis on " for each key individually", the return type of apply is SingleOutputStreamOperator which doesn't provide many operations to group the emited values.

thanks in advance.
Reply | Threaded
Open this post in threaded view
|

Re: emit a single Map<String, T> per window

Aljoscha Krettek
Hi,
from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys.

Does this describe the behavior that you're seeing?

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <[hidden email]> wrote:
hi!

I'm trying to collect some metrics by key per window and emiting the full result at the end of the window to kafka, I started with a simple count by key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the end of the window merge them together and emit a singe result, I don't want to accumulate all the events and calculate at the end of the window, from my understanding of fold in other languages/libraries, this would be what I need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction, winFunction);

where foldFunction accumulates by key and winFunction iterate over the hasmaps and merges them into a single result hashmap and emits that one at the end.

this emits many one-key hash maps instead of only one with all the keys, I tried setting setParallelism(1) in multiple places but still doesn't work. More confusingly, in one run it emited a single map but after I ran it again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

emphasis on " for each key individually", the return type of apply is SingleOutputStreamOperator which doesn't provide many operations to group the emited values.

thanks in advance.
Reply | Threaded
Open this post in threaded view
|

Re: emit a single Map<String, T> per window

Luis Mariano Guerra
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys.

Does this describe the behavior that you're seeing?

yes, it's the behaviour I'm seeing, I'm looking for a way to merge those HashMaps from the same window into a single one, I can't find how.
 

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <[hidden email]> wrote:
hi!

I'm trying to collect some metrics by key per window and emiting the full result at the end of the window to kafka, I started with a simple count by key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the end of the window merge them together and emit a singe result, I don't want to accumulate all the events and calculate at the end of the window, from my understanding of fold in other languages/libraries, this would be what I need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction, winFunction);

where foldFunction accumulates by key and winFunction iterate over the hasmaps and merges them into a single result hashmap and emits that one at the end.

this emits many one-key hash maps instead of only one with all the keys, I tried setting setParallelism(1) in multiple places but still doesn't work. More confusingly, in one run it emited a single map but after I ran it again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

emphasis on " for each key individually", the return type of apply is SingleOutputStreamOperator which doesn't provide many operations to group the emited values.

thanks in advance.

Reply | Threaded
Open this post in threaded view
|

Re: emit a single Map<String, T> per window

Aljoscha Krettek
Hi,
for this you would have to use a non-parallel window, i.e. something like stream.windowAll(<my window>).apply(...). This does not compute per key but has the drawback that computation does not happen in parallel. If you only use it to combine the pre-aggregated maps it could be OK, though.

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <[hidden email]> wrote:
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys.

Does this describe the behavior that you're seeing?

yes, it's the behaviour I'm seeing, I'm looking for a way to merge those HashMaps from the same window into a single one, I can't find how.
 

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <[hidden email]> wrote:
hi!

I'm trying to collect some metrics by key per window and emiting the full result at the end of the window to kafka, I started with a simple count by key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the end of the window merge them together and emit a singe result, I don't want to accumulate all the events and calculate at the end of the window, from my understanding of fold in other languages/libraries, this would be what I need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction, winFunction);

where foldFunction accumulates by key and winFunction iterate over the hasmaps and merges them into a single result hashmap and emits that one at the end.

this emits many one-key hash maps instead of only one with all the keys, I tried setting setParallelism(1) in multiple places but still doesn't work. More confusingly, in one run it emited a single map but after I ran it again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

emphasis on " for each key individually", the return type of apply is SingleOutputStreamOperator which doesn't provide many operations to group the emited values.

thanks in advance.
Reply | Threaded
Open this post in threaded view
|

Re: emit a single Map<String, T> per window

Luis Mariano Guerra
On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
for this you would have to use a non-parallel window, i.e. something like stream.windowAll(<my window>).apply(...). This does not compute per key but has the drawback that computation does not happen in parallel. If you only use it to combine the pre-aggregated maps it could be OK, though.

Cheers,
Aljoscha

hi,

thanks for the tip, it works, I was aware of the non parallel nature of what I want to do, after seeing it work I tried this:

input.flatMap(new LineSplitter()).keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), timeWindowFold, timeWindowMerge)
                .windowAll(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                .apply(new HashMap<String, Integer>(), windowAllFold, windowAllMerge);

and it seems to work, but it seems each timeWindowFold accumulates a single key, I was expecting to have one or more keys per fold depending on in which processing node the computation was being handled, I don't care if I emit one event per key, but I want to know if it's ok and if I'm missing something (maybe I have to add a line to specify partition?)


On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <[hidden email]> wrote:
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys.

Does this describe the behavior that you're seeing?

yes, it's the behaviour I'm seeing, I'm looking for a way to merge those HashMaps from the same window into a single one, I can't find how.
 

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <[hidden email]> wrote:
hi!

I'm trying to collect some metrics by key per window and emiting the full result at the end of the window to kafka, I started with a simple count by key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the end of the window merge them together and emit a singe result, I don't want to accumulate all the events and calculate at the end of the window, from my understanding of fold in other languages/libraries, this would be what I need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction, winFunction);

where foldFunction accumulates by key and winFunction iterate over the hasmaps and merges them into a single result hashmap and emits that one at the end.

this emits many one-key hash maps instead of only one with all the keys, I tried setting setParallelism(1) in multiple places but still doesn't work. More confusingly, in one run it emited a single map but after I ran it again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

emphasis on " for each key individually", the return type of apply is SingleOutputStreamOperator which doesn't provide many operations to group the emited values.

thanks in advance.

Reply | Threaded
Open this post in threaded view
|

Re: emit a single Map<String, T> per window

Aljoscha Krettek
Hi,
what's the number of unique keys and the parallelism of your job? If the former is larger than the latter you should indeed have one "timeWindowFold" be responsible for several keys. How are you determining whether one of these is only accumulating for a single key?

Cheers,
Aljoscha

On Mon, 5 Sep 2016 at 17:35 Luis Mariano Guerra <[hidden email]> wrote:
On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
for this you would have to use a non-parallel window, i.e. something like stream.windowAll(<my window>).apply(...). This does not compute per key but has the drawback that computation does not happen in parallel. If you only use it to combine the pre-aggregated maps it could be OK, though.

Cheers,
Aljoscha

hi,

thanks for the tip, it works, I was aware of the non parallel nature of what I want to do, after seeing it work I tried this:

input.flatMap(new LineSplitter()).keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), timeWindowFold, timeWindowMerge)
                .windowAll(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                .apply(new HashMap<String, Integer>(), windowAllFold, windowAllMerge);

and it seems to work, but it seems each timeWindowFold accumulates a single key, I was expecting to have one or more keys per fold depending on in which processing node the computation was being handled, I don't care if I emit one event per key, but I want to know if it's ok and if I'm missing something (maybe I have to add a line to specify partition?)


On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <[hidden email]> wrote:
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys.

Does this describe the behavior that you're seeing?

yes, it's the behaviour I'm seeing, I'm looking for a way to merge those HashMaps from the same window into a single one, I can't find how.
 

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <[hidden email]> wrote:
hi!

I'm trying to collect some metrics by key per window and emiting the full result at the end of the window to kafka, I started with a simple count by key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the end of the window merge them together and emit a singe result, I don't want to accumulate all the events and calculate at the end of the window, from my understanding of fold in other languages/libraries, this would be what I need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction, winFunction);

where foldFunction accumulates by key and winFunction iterate over the hasmaps and merges them into a single result hashmap and emits that one at the end.

this emits many one-key hash maps instead of only one with all the keys, I tried setting setParallelism(1) in multiple places but still doesn't work. More confusingly, in one run it emited a single map but after I ran it again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually. The output of the window function is interpreted as a regular non-windowed stream.

emphasis on " for each key individually", the return type of apply is SingleOutputStreamOperator which doesn't provide many operations to group the emited values.

thanks in advance.