Hi, Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state. public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow> { private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>( "mapState", String.class, String.class); @Override public void apply(Tuple key, TimeWindow window, final Iterable<Event> input, final Collector<Payload> out) { // do something } @Override public void open(Configuration parameters) throws Exception { System.out.println("## open init window state "); MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here System.out.println("## open window state " + state); } } Thanks for the help. |
Hi Ahmad, Which version of Flink do you use? Thanks, vino. Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
|
Flink 1.6.0. Valuestate initialises successful but mapstate hangs
Regards
|
Hi Ahmad, Can you try to dump thread info from the Task Manager's JVM instance? Thanks, vino. Ahmad Hassan <[hidden email]> 于2018年10月20日周六 下午4:24写道:
|
2018-10-22 13:46:31,944 INFO org.apache.flink.runtime.taskmanager.Task - Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1) (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED. java.lang.NullPointerException: The state properties must not be null at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168) at com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) On Sat, 20 Oct 2018 at 11:29, vino yang <[hidden email]> wrote:
|
I think that's because you declared it as transient field. Move the declaration inside of "open" function to resolve that On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <[hidden email]> wrote:
|
Hi Ahmad, I think Alexander is right. You've declared the state descriptor transient, which effectively makes it null at the worker node, when the state access is happening. Remove the transient modifier or instantiate the descriptor in the open method. The common pattern is to have the state itself as a transient field rather than the descriptor. Best, Dawid On 22/10/2018 15:15, Alexander Smirnov
wrote:
signature.asc (849 bytes) Download Attachment |
Thank you. That worked. Regards, On Tue, 23 Oct 2018 at 09:08, Dawid Wysakowicz <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |