Hi, I am trying to use a ListState in a RichCoFlatMapFunction but when calling: getRuntimeContext().getListState(descriptor) in the open-function i am getting a "State key serializer has not .." exception. I am not sure what i can do to avoid this exception - Are any of you able to provide some help ? Best regards Jacob private ListState<Tuple2<String, Integer>> deltaPositions; @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { // Create state variable ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>( "deltaPositions", // the state name TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { })); deltaPositions = getRuntimeContext().getListState(descriptor); }; 2016-06-22 20:41:38,813 INFO org.apache.flink.runtime.taskmanager.Task - Stream of Items with collection of meadian times (1/1) switched to FAILED with exception. java.lang.RuntimeException: Error while getting state at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131) at crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state. at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129) ... 8 more 2016-06-22 20:41:38,815 INFO org.apache.flink.runtime.taskmanager.Task - Freeing ta |
Hi Jacob, the `ListState` abstraction is a state which we call partitioned/key-value state. As such, it is only possible to use it with a keyed stream. This means that you have to call `keyBy` after the `connect` API call. Cheers, Till On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <[hidden email]> wrote:
|
We should adjust the error message to
contain the keyed stream thingy.
On 23.06.2016 10:11, Till Rohrmann wrote:
|
+1 for a more helpful error message.
@Jacob Would you mind opening a JIRA issue at https://issues.apache.org/jira/browse/FLINK? On Thu, Jun 23, 2016 at 11:31 AM, Chesnay Schepler <[hidden email]> wrote: > We should adjust the error message to contain the keyed stream thingy. > > > On 23.06.2016 10:11, Till Rohrmann wrote: > > Hi Jacob, > > the `ListState` abstraction is a state which we call partitioned/key-value > state. As such, it is only possible to use it with a keyed stream. This > means that you have to call `keyBy` after the `connect` API call. > > Cheers, > Till > > On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <[hidden email]> wrote: >> >> Hi, >> >> I am trying to use a ListState in a RichCoFlatMapFunction but when >> calling: getRuntimeContext().getListState(descriptor) in the open-function i >> am getting a "State key serializer has not .." exception. I am not sure what >> i can do to avoid this exception - Are any of you able to provide some help >> ? >> >> Best regards >> Jacob >> >> >> private ListState<Tuple2<String, Integer>> deltaPositions; >> >> @Override >> public void open(org.apache.flink.configuration.Configuration >> parameters) throws Exception { >> // Create state variable >> ListStateDescriptor<Tuple2<String, Integer>> descriptor = >> new ListStateDescriptor<>( >> "deltaPositions", // the state name >> TypeInformation.of(new TypeHint<Tuple2<String, >> Integer>>() { >> })); >> >> deltaPositions = getRuntimeContext().getListState(descriptor); >> }; >> >> >> >> 2016-06-22 20:41:38,813 INFO org.apache.flink.runtime.taskmanager.Task >> - Stream of Items with collection of meadian times (1/1) switched to FAILED >> with exception. >> java.lang.RuntimeException: Error while getting state >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131) >> at >> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83) >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) >> at >> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.Exception: State key serializer has not been >> configured in the config. This operation cannot use partitioned state. >> at >> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129) >> ... 8 more >> 2016-06-22 20:41:38,815 INFO org.apache.flink.runtime.taskmanager.Task >> - Freeing ta >> >> -- >> Jacob Bay Larsen >> >> Phone: +45 6133 1108 >> E-mail: [hidden email] > > > |
Free forum by Nabble | Edit this page |