My process function is like :
private static class MergeFunction extends RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, ObjectNode>> { private ValueState<Tuple2<Integer, ObjectNode>> state; @Override @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<>("mystate", (Class<Tuple2<Integer,ObjectNode>>) (Object)Tuple2.class)); } } When I running the code: 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED java.lang.RuntimeException: Cannot create full type information based on the given class. If the type has generics, please at org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124) at org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101) at com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557) at org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122) ... 9 more Can I use generics with ValueState? |
If you want to use generics you have to either provide a TypeInformation
instead of a class or create a class that extends Tuple2(Integer, ObjectNode) and use it as the class argument. On 07.05.2017 15:14, yunfan123 wrote: > My process function is like : > > private static class MergeFunction extends > RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, > ObjectNode>> { > > private ValueState<Tuple2<Integer, ObjectNode>> state; > > @Override > @SuppressWarnings("unchecked") > public void open(Configuration parameters) throws Exception { > state = getRuntimeContext().getState(new > ValueStateDescriptor<>("mystate", > (Class<Tuple2<Integer,ObjectNode>>) > (Object)Tuple2.class)); > } > } > > > When I running the code: > 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched > to FAILED > java.lang.RuntimeException: Cannot create full type information based on the > given class. If the type has generics, please > at > org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124) > at > org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101) > at > com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: > Tuple needs to be parameterized by using generics. > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557) > at > org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122) > ... 9 more > > Can I use generics with ValueState? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Please use "new ValueStateDescriptor<>(" That should work... On Mon, May 8, 2017 at 1:11 PM, Chesnay Schepler <[hidden email]> wrote: If you want to use generics you have to either provide a TypeInformation instead of a class or create a class that extends Tuple2(Integer, ObjectNode) and use it as the class argument. |
Free forum by Nabble | Edit this page |