Hi All, I am trying to access elements stored in the state of the window. As window, itself is a stateful operator I think I should be able to get records in the process function after the is triggered. Can someone tell me why in the following code is the state of the window null? Below is a mocked piece of code we are using. Am I doing something wrong here? env.enableCheckpointing(20000L); env.setStateBackend(new FsStateBackend("path")); DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer); DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer); stream1.union(stream2) .keyBy() .timeWindow(Time.milliseconds((30L)) .allowedLateness(Time.minutes(1)) .process(new ProcessWindowFunction<T>() { public void process(T t, ProcessWindowFunction<T>.Context ctx, Iterable<R> itr, Collector<R>collector) KeyedStateStore globalState = ctx.globalState(); ValueState<Tuple6<Long, String, String, String, String, String>> valueState = ctx.globalState().getState(new ValueStateDescriptor<>("valueState", TypeInformation.of(new TypeHint<T>() {}))); System.out.println(valueState.value()); collector.collect(T) }) Regards, Taher Koitawala GS Lab Pune+91 8407979163 |
Hi Taher, As long as you don't put something into the state ValueState#value() will return null. The point for having ctx.globalState(1) and ctx.windowState(2) is to allow users to store some their own state, scoped to key(1) and key & window(2) accordingly. If you want to access all elements assigned to that window you can iterate over them with the "itr" in your example. Best, Dawid On 25/09/18 15:07, Taher Koitawala
wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, Thanks for the answer, how do I get the state of the Window then? I do understand that elements are going to the state as window in itself is a stateful operator. How do I get access to those elements? Regards, Taher Koitawala GS Lab Pune+91 8407979163 On Tue, Sep 25, 2018 at 6:51 PM Dawid Wysakowicz <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |