Null Flink State

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Null Flink State

Taher Koitawala
Hi All,
           I am trying to access elements stored in the state of the window. As window, itself is a stateful operator I think I should be able to get records in the process function after the is triggered. Can someone tell me why in the following code is the state of the window null? 

Below is a mocked piece of code we are using. Am I doing something wrong here?

env.enableCheckpointing(20000L);
env.setStateBackend(new FsStateBackend("path"));
DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer);
DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer);

stream1.union(stream2)
.keyBy()
.timeWindow(Time.milliseconds((30L))
.allowedLateness(Time.minutes(1))
.process(new ProcessWindowFunction<T>()
{
public void process(T t, ProcessWindowFunction<T>.Context ctx, Iterable<R> itr, Collector<R>collector)
KeyedStateStore globalState = ctx.globalState();
ValueState<Tuple6<Long, String, String, String, String, String>> valueState = ctx.globalState().getState(new ValueStateDescriptor<>("valueState", TypeInformation.of(new TypeHint<T>() {})));

System.out.println(valueState.value());
collector.collect(T)
})



Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163
Reply | Threaded
Open this post in threaded view
|

Re: Null Flink State

Dawid Wysakowicz-2

Hi Taher,

As long as you don't put something into the state ValueState#value() will return null. The point for having ctx.globalState(1) and ctx.windowState(2) is to allow users to store some their own state, scoped to key(1) and  key & window(2) accordingly. If you want to access all elements assigned to that window you can iterate over them with the "itr" in your example.

Best,

Dawid


On 25/09/18 15:07, Taher Koitawala wrote:
Hi All,
           I am trying to access elements stored in the state of the window. As window, itself is a stateful operator I think I should be able to get records in the process function after the is triggered. Can someone tell me why in the following code is the state of the window null? 

Below is a mocked piece of code we are using. Am I doing something wrong here?

env.enableCheckpointing(20000L);
env.setStateBackend(new FsStateBackend("path"));
DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer);
DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer);

stream1.union(stream2)
.keyBy()
.timeWindow(Time.milliseconds((30L))
.allowedLateness(Time.minutes(1))
.process(new ProcessWindowFunction<T>()
{
public void process(T t, ProcessWindowFunction<T>.Context ctx, Iterable<R> itr, Collector<R>collector)
KeyedStateStore globalState = ctx.globalState();
ValueState<Tuple6<Long, String, String, String, String, String>> valueState = ctx.globalState().getState(new ValueStateDescriptor<>("valueState", TypeInformation.of(new TypeHint<T>() {})));

System.out.println(valueState.value());
collector.collect(T)
})



Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Null Flink State

Taher Koitawala
Hi Dawid,
              Thanks for the answer, how do I get the state of the Window then? I do understand that elements are going to the state as window in itself is a stateful operator. How do I get access to those elements?

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Tue, Sep 25, 2018 at 6:51 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Taher,

As long as you don't put something into the state ValueState#value() will return null. The point for having ctx.globalState(1) and ctx.windowState(2) is to allow users to store some their own state, scoped to key(1) and  key & window(2) accordingly. If you want to access all elements assigned to that window you can iterate over them with the "itr" in your example.

Best,

Dawid


On 25/09/18 15:07, Taher Koitawala wrote:
Hi All,
           I am trying to access elements stored in the state of the window. As window, itself is a stateful operator I think I should be able to get records in the process function after the is triggered. Can someone tell me why in the following code is the state of the window null? 

Below is a mocked piece of code we are using. Am I doing something wrong here?

env.enableCheckpointing(20000L);
env.setStateBackend(new FsStateBackend("path"));
DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer);
DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer);

stream1.union(stream2)
.keyBy()
.timeWindow(Time.milliseconds((30L))
.allowedLateness(Time.minutes(1))
.process(new ProcessWindowFunction<T>()
{
public void process(T t, ProcessWindowFunction<T>.Context ctx, Iterable<R> itr, Collector<R>collector)
KeyedStateStore globalState = ctx.globalState();
ValueState<Tuple6<Long, String, String, String, String, String>> valueState = ctx.globalState().getState(new ValueStateDescriptor<>("valueState", TypeInformation.of(new TypeHint<T>() {})));

System.out.println(valueState.value());
collector.collect(T)
})



Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163