Re: Null Flink State

Posted by Dawid Wysakowicz-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Null-Flink-State-tp23373p23374.html

Hi Taher,

As long as you don't put something into the state ValueState#value() will return null. The point for having ctx.globalState(1) and ctx.windowState(2) is to allow users to store some their own state, scoped to key(1) and  key & window(2) accordingly. If you want to access all elements assigned to that window you can iterate over them with the "itr" in your example.

Best,

Dawid


On 25/09/18 15:07, Taher Koitawala wrote:
Hi All,
           I am trying to access elements stored in the state of the window. As window, itself is a stateful operator I think I should be able to get records in the process function after the is triggered. Can someone tell me why in the following code is the state of the window null? 

Below is a mocked piece of code we are using. Am I doing something wrong here?

env.enableCheckpointing(20000L);
env.setStateBackend(new FsStateBackend("path"));
DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer);
DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer);

stream1.union(stream2)
.keyBy()
.timeWindow(Time.milliseconds((30L))
.allowedLateness(Time.minutes(1))
.process(new ProcessWindowFunction<T>()
{
public void process(T t, ProcessWindowFunction<T>.Context ctx, Iterable<R> itr, Collector<R>collector)
KeyedStateStore globalState = ctx.globalState();
ValueState<Tuple6<Long, String, String, String, String, String>> valueState = ctx.globalState().getState(new ValueStateDescriptor<>("valueState", TypeInformation.of(new TypeHint<T>() {})));

System.out.println(valueState.value());
collector.collect(T)
})



Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


signature.asc (849 bytes) Download Attachment