State & Generics

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

State & Generics

Laurent Exsteens
Hello,

Using Flink 1.8.1, I'm getting the following error:
     The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
when trying to create a ListStateDescriptor with a generic type (full sample code in attachment):
public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT, RIGHT, Tuple2<LEFT, RIGHT>> {

private transient ListState<LEFT> leftState;
private transient ListState<RIGHT> rightState;

@Override
public void open(Configuration config) {
ListStateDescriptor<LEFT> left_descriptor =
new ListStateDescriptor<>(
"and_left",
TypeInformation.of(new TypeHint<LEFT>() {
}));
leftState = getRuntimeContext().getListState(left_descriptor);
This gives me the following stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at eu.euranova.leadcep.Main.main(Main.java:61)
Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
at org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:54)
at eu.euranova.leadcep.AND$1.<init>(AND.java:22)
at eu.euranova.leadcep.AND.open(AND.java:19)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)



Googling the error didn't give a working solution.

Is there a way to work with state using Generic types? if yes, how?

Thanks in advance for your help!

Best Regards,

Laurent.

--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen

Main.java (2K) Download Attachment
AND.java (2K) Download Attachment
TestType.java (586 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: State & Generics

Mike Mintz
I was able to get generic types to work when I used GenericTypeInfo and made sure to wrap the generic in some concrete type. In my case I used scala.Some as the wrapper. It looks something like this (in Scala):

import org.apache.flink.api.java.typeutils.GenericTypeInfo
val descriptor = new ListStateDescriptor[Some[T]]("blah", new GenericTypeInfo(classOf[Some[T]]))

Since the descriptor is Some[T] instead of T, I had to wrap and unwrap it every time I used it.

On Sat, Mar 28, 2020 at 6:02 AM Laurent Exsteens <[hidden email]> wrote:
Hello,

Using Flink 1.8.1, I'm getting the following error:
     The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
when trying to create a ListStateDescriptor with a generic type (full sample code in attachment):
public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT, RIGHT, Tuple2<LEFT, RIGHT>> {

private transient ListState<LEFT> leftState;
private transient ListState<RIGHT> rightState;

@Override
public void open(Configuration config) {
ListStateDescriptor<LEFT> left_descriptor =
new ListStateDescriptor<>(
"and_left",
TypeInformation.of(new TypeHint<LEFT>() {
}));
leftState = getRuntimeContext().getListState(left_descriptor);
This gives me the following stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at eu.euranova.leadcep.Main.main(Main.java:61)
Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
at org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:54)
at eu.euranova.leadcep.AND$1.<init>(AND.java:22)
at eu.euranova.leadcep.AND.open(AND.java:19)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)



Googling the error didn't give a working solution.

Is there a way to work with state using Generic types? if yes, how?

Thanks in advance for your help!

Best Regards,

Laurent.

--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen
Reply | Threaded
Open this post in threaded view
|

Re: State & Generics

Laurent Exsteens
Hello Mike,

thanks for the info.
I tried to do sth similar in Java. Not there yet but I think that should be feasible.

However, like you said, that means additional operations for each event.

Yesterday I managed to find another solution: create the type information outside of the class and pass it to the constructor. I can retrieve the type information from DataStream.getType() (whiich. This works well, and is acceptable in my case.

I'm starting to understand that the problem resides in Java Generics type erasure: we cannot create the TypeInformation using a TypeHint inside the Generic class, since creating the TypeHint is a runtime operation which cannot access the generic type anymore, since it has been erased at compile time.
Is my understanding correct?

I'm still interested by a solution where everything could be done inside the flatMap function, and without extra wrap and unwrap (although I'm starting to think that is not possible due to type erasure).

Regards,

Laurent.




On Mon, 30 Mar 2020 at 04:50, Mike Mintz <[hidden email]> wrote:
I was able to get generic types to work when I used GenericTypeInfo and made sure to wrap the generic in some concrete type. In my case I used scala.Some as the wrapper. It looks something like this (in Scala):

import org.apache.flink.api.java.typeutils.GenericTypeInfo
val descriptor = new ListStateDescriptor[Some[T]]("blah", new GenericTypeInfo(classOf[Some[T]]))

Since the descriptor is Some[T] instead of T, I had to wrap and unwrap it every time I used it.

On Sat, Mar 28, 2020 at 6:02 AM Laurent Exsteens <[hidden email]> wrote:
Hello,

Using Flink 1.8.1, I'm getting the following error:
     The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
when trying to create a ListStateDescriptor with a generic type (full sample code in attachment):
public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT, RIGHT, Tuple2<LEFT, RIGHT>> {

private transient ListState<LEFT> leftState;
private transient ListState<RIGHT> rightState;

@Override
public void open(Configuration config) {
ListStateDescriptor<LEFT> left_descriptor =
new ListStateDescriptor<>(
"and_left",
TypeInformation.of(new TypeHint<LEFT>() {
}));
leftState = getRuntimeContext().getListState(left_descriptor);
This gives me the following stack trace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at eu.euranova.leadcep.Main.main(Main.java:61)
Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
at org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:54)
at eu.euranova.leadcep.AND$1.<init>(AND.java:22)
at eu.euranova.leadcep.AND.open(AND.java:19)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)



Googling the error didn't give a working solution.

Is there a way to work with state using Generic types? if yes, how?

Thanks in advance for your help!

Best Regards,

Laurent.

--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen


--
Laurent Exsteens
Data Engineer
(M) +32 (0) 486 20 48 36

EURA NOVA

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) <a href="tel:%2B32%2010%2075%2002%2000" value="+3210750200" style="color:rgb(17,85,204)" target="_blank">+32 10 75 02 00

euranova.eu

research.euranova.eu


 Be green, keep it on the screen
Reply | Threaded
Open this post in threaded view
|

Re: State & Generics

Aljoscha Krettek
Hi Laurent!

On 31.03.20 10:43, Laurent Exsteens wrote:
> Yesterday I managed to find another solution: create the type information
> outside of the class and pass it to the constructor. I can retrieve the
> type information from DataStream.getType() (whiich. This works well, and is
> acceptable in my case.

This is a valid solution which is also used internally in some parts of
Flink.

> I'm starting to understand that the problem resides in Java Generics type
> erasure: we cannot create the TypeInformation using a TypeHint inside the
> Generic class, since creating the TypeHint is a runtime operation which
> cannot access the generic type anymore, since it has been erased at compile
> time.
> Is my understanding correct?

Yes, this seems correct.

>
> I'm still interested by a solution where everything could be done inside
> the flatMap function, and without extra wrap and unwrap (although I'm
> starting to think that is not possible due to type erasure).

It would be possible if you write your completely custom TypeSerializer,
but that is probably a lot more effort than it would be worth,
considering you also found the alternative solution.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: State & Generics

Laurent Exsteens
Hi Aljoscha,

Thank you for your answer!

Out of curiosity, would writing my own serializer involve implementing a serialisation for every your I could get? 

On Wed, Apr 1, 2020, 13:57 Aljoscha Krettek <[hidden email]> wrote:
Hi Laurent!

On 31.03.20 10:43, Laurent Exsteens wrote:
> Yesterday I managed to find another solution: create the type information
> outside of the class and pass it to the constructor. I can retrieve the
> type information from DataStream.getType() (whiich. This works well, and is
> acceptable in my case.

This is a valid solution which is also used internally in some parts of
Flink.

> I'm starting to understand that the problem resides in Java Generics type
> erasure: we cannot create the TypeInformation using a TypeHint inside the
> Generic class, since creating the TypeHint is a runtime operation which
> cannot access the generic type anymore, since it has been erased at compile
> time.
> Is my understanding correct?

Yes, this seems correct.

>
> I'm still interested by a solution where everything could be done inside
> the flatMap function, and without extra wrap and unwrap (although I'm
> starting to think that is not possible due to type erasure).

It would be possible if you write your completely custom TypeSerializer,
but that is probably a lot more effort than it would be worth,
considering you also found the alternative solution.

Best,
Aljoscha

 Be green, keep it on the screen