回复:Transfer information from one window to the next

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:Transfer information from one window to the next

施晓罡(星罡)
Hi sonex

I think you can accomplish it by using a PassThroughFunction as the apply function and processing the elements in a rich flatMap function followed.  You can keep the information in the flatmap function (via states) so that they can be shared among different windows.

The program may look like

stream.keyBy(...).timeWindow(...)
    .apply(new WindowFunction() {
        public void apply(K key, W window, Iterable<IN> elements, Collector<OUT> out) {
            out.collect(new Tuple3<>(key, window, elements);
    })
    .keyBy(0)    // use the same key as the windows
    .flatMap(...) // process the windows with shared information

Regards,
Xiaogang


------------------------------------------------------------------
发件人:Sonex <[hidden email]>
发送时间:2017年2月20日(星期一) 16:32
收件人:user <[hidden email]>
主 题:Transfer information from one window to the next

val stream =
inputStream.assignAscendingTimestamps(_.eventTime).keyBy(_.inputKey).timeWindow(Time.seconds(3600),Time.seconds(3600))

stream.apply{...}

Given the above example I want to transfer information (variables and
values) from the current apply function to the apply function of the next
window (of course with the same key).

How can I do that?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11737.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Transfer information from one window to the next

Sonex
I don`t think you understood the question correctly. I do not care about information between windows at the same time (i.e., start of window = 0, end of window 3600). I want to pass a variable, let`s say for key 1, from the apply function of window 0-3600 to the apply function of window 3600-7200, for key 1.
Reply | Threaded
Open this post in threaded view
|

回复:回复:Transfer information from one window to the next

施晓罡(星罡)
In reply to this post by 施晓罡(星罡)
Hi Sonex

All windows under the same key (e.g., TimeWindow(0, 3600) and TimeWindow(3600, 7200)) will be processed by the flatmap function. You can put the variable drawn from TimeWindow(0, 3600) into a State. When you receive TimeWindow(3600, 7200), you can access the state and apply the function with the obtained variable.

Regards,
Xiaogang

------------------------------------------------------------------
发件人:Sonex <[hidden email]>
发送时间:2017年2月20日(星期一) 19:54
收件人:user <[hidden email]>
主 题:Re: 回复:Transfer information from one window to the next

I don`t think you understood the question correctly. I do not care about
information between windows at the same time (i.e., start of window = 0, end
of window 3600). I want to pass a variable, let`s say for key 1, from the
apply function of window 0-3600 to the apply function of window 3600-7200,
for key 1.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11739.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: 回复:回复:Transfer information from one window to the next

Sonex
Hi and thank you for your response,

is it possible to give me a simple example? How can I put the variable into a state and then access the state to the next apply function?

I am new to flink.

Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Transfer information from one window to the next

Sonex
In reply to this post by 施晓罡(星罡)
I solved the state you were talking about.

The solution would like like this (similar to what you wrote):

stream.keyBy(...).timeWindow(...)
    .apply(new WindowFunction() {
        public void apply(K key, W window, Iterable<IN> elements, Collector<OUT> out) {
            out.collect(new Tuple3<>(key, window, elements);
    })
    .keyBy(0)    // use the same key as the windows
    .mapWitState(...) // process the windows with shared information