Hello,
I am utilizing the code snippet in:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code:
@Override
public
void
open(Configuration
config)
{
ValueStateDescriptor<Tuple2<Long,
Long>>
descriptor
=
new
ValueStateDescriptor<>(
"average",
// the state name
TypeInformation.of(new
TypeHint<Tuple2<Long,
Long>>()
{}),
// type information
Tuple2.of(0L,
0L));
// default value of the state, if nothing was set
sum
=
getRuntimeContext().getState(descriptor);
}
When I run, I get the following error:
Caused by: java.lang.RuntimeException: Error while getting state
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.
at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
... 8 more
Where do I define the key & value serializer for state?
Thanks,
Buvana
Free forum by Nabble | Edit this page |