save state in windows operation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

save state in windows operation

Rubén Casado
Hello,

We have problems working with states in Flink and I am sure you can help us :-)

Let's say we have a workflow something like:

DataStream<K> myData = env.from...

myData.map(new MyMap (..))
            .keyBy(0)
            .countWindow(n)
            .apply(new MyApplyFunction())
            .writeAsCSV(...)

To implement the logic of our MyApplyFunction, in the apply() method we would need to have access to the result of the last window computation. Before emiting the resulst in the apply () using collector.collect(..), we could save that result in an external storage systems (e.g Redis /Hazelcast) and then, in the begininig of the next window computation read such value, but we woud like to use some internal mechanism of Flink to do that.

Could some provide help about it? Thanks in advance!!! :-)

Best

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

+34 902 286 386 - +34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________

Reply | Threaded
Open this post in threaded view
|

Re: save state in windows operation

Aljoscha Krettek
Hi,
you should be able to do this using Flink's state abstraction in a RichWindowFunction like this:

public static class MyApplyFunction extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, GlobalWindow> {

    ValueStateDescriptor<Tuple2<String, Integer>> stateDescriptor =
            new ValueStateDescriptor<>("last-result",
                    new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo(),
                    null);

    @Override
    public void apply(Tuple key,
            GlobalWindow window,
            Iterable<Tuple2<String, Integer>> input,
            Collector<Tuple2<String, Integer>> out) throws Exception {
        ValueState<Tuple2<String, Integer>> state = getRuntimeContext().getState(stateDescriptor);

        Tuple2<String, Integer> lastResult = state.value();
        if (lastResult != null) {
            // do something with it
        } else {
            
        }
        
        // do our computation
        
        // store for future use
        state.update(new Tuple2<>("hey there", 42));
    }
}

The arguments of ValueStateDescriptor are: state name, TypeInformation for the values in the state, default value of the state that you get if nothing is set.

Also, keep in mind that the state is local to each key, just as the window is local to each key.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 11:10 Rubén Casado <[hidden email]> wrote:
Hello,

We have problems working with states in Flink and I am sure you can help us :-)

Let's say we have a workflow something like:

DataStream<K> myData = env.from...

myData.map(new MyMap (..))
            .keyBy(0)
            .countWindow(n)
            .apply(new MyApplyFunction())
            .writeAsCSV(...)

To implement the logic of our MyApplyFunction, in the apply() method we would need to have access to the result of the last window computation. Before emiting the resulst in the apply () using collector.collect(..), we could save that result in an external storage systems (e.g Redis /Hazelcast) and then, in the begininig of the next window computation read such value, but we woud like to use some internal mechanism of Flink to do that.

Could some provide help about it? Thanks in advance!!! :-)

Best

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

+34 902 286 386 - +34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________

Reply | Threaded
Open this post in threaded view
|

Re: save state in windows operation

Rubén Casado
Thanks for your help!! That is exactly what we need :-)

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

+34 902 286 386 - +34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________


----- Mensaje original -----
De: "Aljoscha Krettek" <[hidden email]>
Para: [hidden email]
Enviados: Jueves, 21 de Abril 2016 11:21:00 GMT +01:00 Amsterdam / Berlín / Berna / Roma / Estocolmo / Viena
Asunto: Re: save state in windows operation

Hi,
you should be able to do this using Flink's state abstraction in a RichWindowFunction like this:

public static class MyApplyFunction extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, GlobalWindow> {

    ValueStateDescriptor<Tuple2<String, Integer>> stateDescriptor =
            new ValueStateDescriptor<>("last-result",
                    new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo(),
                    null);

    @Override
    public void apply(Tuple key,
            GlobalWindow window,
            Iterable<Tuple2<String, Integer>> input,
            Collector<Tuple2<String, Integer>> out) throws Exception {
        ValueState<Tuple2<String, Integer>> state = getRuntimeContext().getState(stateDescriptor);

        Tuple2<String, Integer> lastResult = state.value();
        if (lastResult != null) {
            // do something with it
        } else {
            
        }
        
        // do our computation
        
        // store for future use
        state.update(new Tuple2<>("hey there", 42));
    }
}

The arguments of ValueStateDescriptor are: state name, TypeInformation for the values in the state, default value of the state that you get if nothing is set.

Also, keep in mind that the state is local to each key, just as the window is local to each key.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 11:10 Rubén Casado <[hidden email]> wrote:
Hello,

We have problems working with states in Flink and I am sure you can help us :-)

Let's say we have a workflow something like:

DataStream<K> myData = env.from...

myData.map(new MyMap (..))
            .keyBy(0)
            .countWindow(n)
            .apply(new MyApplyFunction())
            .writeAsCSV(...)

To implement the logic of our MyApplyFunction, in the apply() method we would need to have access to the result of the last window computation. Before emiting the resulst in the apply () using collector.collect(..), we could save that result in an external storage systems (e.g Redis /Hazelcast) and then, in the begininig of the next window computation read such value, but we woud like to use some internal mechanism of Flink to do that.

Could some provide help about it? Thanks in advance!!! :-)

Best

______________________________________

Dr. Rubén Casado
Head of Big Data
Treelogic
ruben.casado.treelogic

+34 902 286 386 - +34 607 18 28 06

Parque Tecnológico de Asturias · Parcela 30
E33428 Llanera · Asturias [Spain]
www.treelogic.com
______________________________________

Reply | Threaded
Open this post in threaded view
|

Re: save state in windows operation

machouicha
Hi:
I am basically trying to do the same thing. But I cannot pass my "WindowMetrics", to the apply function of my stream. The compiler complain:

The method apply(WindowFunction<Account,R,Tuple,GlobalWindow>) in the type WindowedStream<Account,Tuple,GlobalWindow> is not applicable for the arguments (AccountDriRule2.WindowMetrics)

Example:
 outstandingAcctStream.keyBy(0).countWindow(10).apply(new WindowMetrics());

class WindowMetrics extends RichWindowFunction<Account, Long, Long, TimeWindow> {
           ......
}

What I am doing wrong. Thanks