Type erasure exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Type erasure exception

gdibernardo
Hi guys,

When I run my Flink topology (locally) I get this error:

The return type of function 'main(Job.java:69)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

My code is:
FlinkKafkaConsumer010<String> controlsConsumer = new FlinkKafkaConsumer010<String>(Topics.CONTROL_TOPIC, new SimpleStringSchema(), properties);

DataStream <ControlMessage> controlStream = environment.addSource(controlsConsumer)
.rebalance()
.map(value -> {
ControlMessage message = new ControlMessage();
message.initFromJSON(value);
return message;
});
KeyedStream <Tuple2<String, ControlMessage>, Tuple> keyed = controlStream
.map(message -> new Tuple2<String, ControlMessage>(message.getExpressionId(), message))
.keyBy(0);

The exception is raised in the second map (before the keyBy() operation). From my understanding I have to use returns(…) after the map function. However, I can’t really understand why. Can someone, please, explain me why this happens? 

Thank you so much in advance.

Best,


Gabriele
Reply | Threaded
Open this post in threaded view
|

Re: Type erasure exception

mmziyad
Hi Gabriele

Type extraction of java 8 lambdas is not yet supported in IntelliJ Idea IDE.
You may solve the issues by following one of the below options.

1. Provide type hints

2. Set Eclipse JDT as the compiler:

3. Avoid using lambdas :)

Hope this helps.

Best
Ziyad

On Mon, Jul 24, 2017 at 8:42 PM, Gabriele Di Bernardo <[hidden email]> wrote:
Hi guys,

When I run my Flink topology (locally) I get this error:

The return type of function 'main(Job.java:69)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

My code is:
FlinkKafkaConsumer010<String> controlsConsumer = new FlinkKafkaConsumer010<String>(Topics.CONTROL_TOPIC, new SimpleStringSchema(), properties);

DataStream <ControlMessage> controlStream = environment.addSource(controlsConsumer)
.rebalance()
.map(value -> {
ControlMessage message = new ControlMessage();
message.initFromJSON(value);
return message;
});
KeyedStream <Tuple2<String, ControlMessage>, Tuple> keyed = controlStream
.map(message -> new Tuple2<String, ControlMessage>(message.getExpressionId(), message))
.keyBy(0);

The exception is raised in the second map (before the keyBy() operation). From my understanding I have to use returns(…) after the map function. However, I can’t really understand why. Can someone, please, explain me why this happens? 

Thank you so much in advance.

Best,


Gabriele