Can ValueState use generics?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Can ValueState use generics?

yunfan123
My process function is like :

    private static class MergeFunction extends RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, ObjectNode>> {

        private ValueState<Tuple2<Integer, ObjectNode>> state;

        @Override
        @SuppressWarnings("unchecked")
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("mystate",
                    (Class<Tuple2<Integer,ObjectNode>>) (Object)Tuple2.class));
        }
}


When I running the code:
05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED
java.lang.RuntimeException: Cannot create full type information based on the given class. If the type has generics, please
        at org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124)
        at org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101)
        at com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics.
        at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
        at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
        at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
        at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
        at org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122)
        ... 9 more

Can I use generics with ValueState?
Reply | Threaded
Open this post in threaded view
|

Re: Can ValueState use generics?

Chesnay Schepler
If you want to use generics you have to either provide a TypeInformation
instead of a class or create a class that extends Tuple2(Integer,
ObjectNode) and use it as the class argument.

On 07.05.2017 15:14, yunfan123 wrote:

> My process function is like :
>
>      private static class MergeFunction extends
> RichProcessFunction<Tuple2&lt;Integer, ObjectNode>, Tuple2<Integer,
> ObjectNode>> {
>
>          private ValueState<Tuple2&lt;Integer, ObjectNode>> state;
>
>          @Override
>          @SuppressWarnings("unchecked")
>          public void open(Configuration parameters) throws Exception {
>              state = getRuntimeContext().getState(new
> ValueStateDescriptor<>("mystate",
>                      (Class<Tuple2&lt;Integer,ObjectNode>>)
> (Object)Tuple2.class));
>          }
> }
>
>
> When I running the code:
> 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
> to FAILED
> java.lang.RuntimeException: Cannot create full type information based on the
> given class. If the type has generics, please
> at
> org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124)
> at
> org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101)
> at
> com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Tuple needs to be parameterized by using generics.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
> at
> org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122)
> ... 9 more
>
> Can I use generics with ValueState?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Can ValueState use generics?

Stephan Ewen
Please use 

"new ValueStateDescriptor<>("mystate", TypeInformation.of(new TypeHint<Tuple2<Integer,ObjectNode>>(){}));

That should work...

On Mon, May 8, 2017 at 1:11 PM, Chesnay Schepler <[hidden email]> wrote:
If you want to use generics you have to either provide a TypeInformation instead of a class or create a class that extends Tuple2(Integer, ObjectNode) and use it as the class argument.


On <a href="tel:07.05.2017%2015" value="+49705201715" target="_blank">07.05.2017 15:14, yunfan123 wrote:
My process function is like :

     private static class MergeFunction extends
RichProcessFunction<Tuple2&lt;Integer, ObjectNode>, Tuple2<Integer,
ObjectNode>> {

         private ValueState<Tuple2&lt;Integer, ObjectNode>> state;

         @Override
         @SuppressWarnings("unchecked")
         public void open(Configuration parameters) throws Exception {
             state = getRuntimeContext().getState(new
ValueStateDescriptor<>("mystate",
                     (Class<Tuple2&lt;Integer,ObjectNode>>)
(Object)Tuple2.class));
         }
}


When I running the code:
05/07/2017 21:17:47     Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
to FAILED
java.lang.RuntimeException: Cannot create full type information based on the
given class. If the type has generics, please
        at
org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124)
        at
org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101)
        at
com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Tuple needs to be parameterized by using generics.
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
        at
org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122)
        ... 9 more

Can I use generics with ValueState?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.