Questions about FoldFunction and WindowFunction

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about FoldFunction and WindowFunction

Yassine MARZOUGUI
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in keyedStream.window().fold(new RichFoldFunction()), is the close() method called after each window or after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction via apply(R initialValue, FoldFunction<T,R> foldFunction, WindowFunction<R,R,K,W> function), why should the output of the WindowFunction be of the same type as the input? It would be practical to have a different output type sometimes, for example one would fold tuples in the FoldFunction and then process the (only) aggregated tuple in the Window function and emit an Integer.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Questions about FoldFunction and WindowFunction

Aljoscha Krettek
Hi Yassine,

regarding 1. The close() method of the RichFoldFunction will only be called at the very end of your streaming job, so in practise it will never be called. This is there because of batch jobs, where you have an actual end in your processing.

regarding 2. I'm afraid you came across a bug: https://issues.apache.org/jira/browse/FLINK-3869. We can't change this right now because we cannot break API instability but right at the end of this issue I'm proposing a different solution that we'll hopefully get in for the next release.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in keyedStream.window().fold(new RichFoldFunction()), is the close() method called after each window or after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction via apply(R initialValue, FoldFunction<T,R> foldFunction, WindowFunction<R,R,K,W> function), why should the output of the WindowFunction be of the same type as the input? It would be practical to have a different output type sometimes, for example one would fold tuples in the FoldFunction and then process the (only) aggregated tuple in the Window function and emit an Integer.

Best,
Yassine
Reply | Threaded
Open this post in threaded view
|

Re: Questions about FoldFunction and WindowFunction

Yassine MARZOUGUI
Thank you Aljoscha for your quick response.

Best,
Yassine

2016-11-02 12:30 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Yassine,

regarding 1. The close() method of the RichFoldFunction will only be called at the very end of your streaming job, so in practise it will never be called. This is there because of batch jobs, where you have an actual end in your processing.

regarding 2. I'm afraid you came across a bug: https://issues.apache.org/jira/browse/FLINK-3869. We can't change this right now because we cannot break API instability but right at the end of this issue I'm proposing a different solution that we'll hopefully get in for the next release.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in keyedStream.window().fold(new RichFoldFunction()), is the close() method called after each window or after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction via apply(R initialValue, FoldFunction<T,R> foldFunction, WindowFunction<R,R,K,Wfunction), why should the output of the WindowFunction be of the same type as the input? It would be practical to have a different output type sometimes, for example one would fold tuples in the FoldFunction and then process the (only) aggregated tuple in the Window function and emit an Integer.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Questions about FoldFunction and WindowFunction

Aljoscha Krettek
Would you be interested in contributing a fix for that? Otherwise I'll probably fix work on that in the coming weeks.

On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI <[hidden email]> wrote:
Thank you Aljoscha for your quick response.

Best,
Yassine

2016-11-02 12:30 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Yassine,

regarding 1. The close() method of the RichFoldFunction will only be called at the very end of your streaming job, so in practise it will never be called. This is there because of batch jobs, where you have an actual end in your processing.

regarding 2. I'm afraid you came across a bug: https://issues.apache.org/jira/browse/FLINK-3869. We can't change this right now because we cannot break API instability but right at the end of this issue I'm proposing a different solution that we'll hopefully get in for the next release.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in keyedStream.window().fold(new RichFoldFunction()), is the close() method called after each window or after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction via apply(R initialValue, FoldFunction<T,R> foldFunction, WindowFunction<R,R,K,W> function), why should the output of the WindowFunction be of the same type as the input? It would be practical to have a different output type sometimes, for example one would fold tuples in the FoldFunction and then process the (only) aggregated tuple in the Window function and emit an Integer.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Questions about FoldFunction and WindowFunction

Yassine MARZOUGUI
Yes, with please. Could you please assign it temporarily to me? (I am not very familiar with the internal components of Flink and migh take some time before contributing the code, if by the time you are ready to work on it I am not yet done, you can reassign it to yourself)

2016-11-02 14:07 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Would you be interested in contributing a fix for that? Otherwise I'll probably fix work on that in the coming weeks.

On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI <[hidden email]> wrote:
Thank you Aljoscha for your quick response.

Best,
Yassine

2016-11-02 12:30 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Yassine,

regarding 1. The close() method of the RichFoldFunction will only be called at the very end of your streaming job, so in practise it will never be called. This is there because of batch jobs, where you have an actual end in your processing.

regarding 2. I'm afraid you came across a bug: https://issues.apache.org/jira/browse/FLINK-3869. We can't change this right now because we cannot break API instability but right at the end of this issue I'm proposing a different solution that we'll hopefully get in for the next release.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in keyedStream.window().fold(new RichFoldFunction()), is the close() method called after each window or after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction via apply(R initialValue, FoldFunction<T,R> foldFunction, WindowFunction<R,R,K,Wfunction), why should the output of the WindowFunction be of the same type as the input? It would be practical to have a different output type sometimes, for example one would fold tuples in the FoldFunction and then process the (only) aggregated tuple in the Window function and emit an Integer.

Best,
Yassine


Reply | Threaded
Open this post in threaded view
|

Re: Questions about FoldFunction and WindowFunction

Aljoscha Krettek
Hi Yassine,
I made you a contributor in the Flink Jira so you will be able to assign issues to yourself in the future. I also assigned this issue to you.

I think you only need to do changes in WindwedStream and AllWindowedStream. Let me know if you need anything. :-)

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 14:48 Yassine MARZOUGUI <[hidden email]> wrote:
Yes, with please. Could you please assign it temporarily to me? (I am not very familiar with the internal components of Flink and migh take some time before contributing the code, if by the time you are ready to work on it I am not yet done, you can reassign it to yourself)

2016-11-02 14:07 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Would you be interested in contributing a fix for that? Otherwise I'll probably fix work on that in the coming weeks.

On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI <[hidden email]> wrote:
Thank you Aljoscha for your quick response.

Best,
Yassine

2016-11-02 12:30 GMT+01:00 Aljoscha Krettek <[hidden email]>:
Hi Yassine,

regarding 1. The close() method of the RichFoldFunction will only be called at the very end of your streaming job, so in practise it will never be called. This is there because of batch jobs, where you have an actual end in your processing.

regarding 2. I'm afraid you came across a bug: https://issues.apache.org/jira/browse/FLINK-3869. We can't change this right now because we cannot break API instability but right at the end of this issue I'm proposing a different solution that we'll hopefully get in for the next release.

Cheers,
Aljoscha

On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <[hidden email]> wrote:
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in keyedStream.window().fold(new RichFoldFunction()), is the close() method called after each window or after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction via apply(R initialValue, FoldFunction<T,R> foldFunction, WindowFunction<R,R,K,W> function), why should the output of the WindowFunction be of the same type as the input? It would be practical to have a different output type sometimes, for example one would fold tuples in the FoldFunction and then process the (only) aggregated tuple in the Window function and emit an Integer.

Best,
Yassine