Problems with type erasure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Problems with type erasure

Vincenzo Pronestì

Hi there,

I need to execute the following code:

 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays
 73:               .flatMap(new Query1FlatMap())
 74:              .keyBy(item -> item.f0);

but I keep getting this error message:

	The program finished with the following exception:

	The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
		org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)
		org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)
		org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)
		org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)

I've read this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html (there's an example with Tuple2<String,Double>> which is the same I need) and I think I have two options:
1 - Implement ResultTypeQueryable<Tuple2<String, Double>> in my Query1FlatMap
 class. I did this by adding:
	@Override
	public TypeInformation<Tuple2<String, Double>> getProducedType() {
    		return TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
	}

2 - Use the returns method right after the flatMap(new Query1FlatMap()), like this:
	TypeInformation<Tuple2<String,Double>> tInfo = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
        KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays
                .flatMap(new Query1FlatMap()).returns(tInfo)
                .keyBy(item -> item.f0);
Actually I've also tried with:
	TypeHint<Tuple2<String,Double>> tHint = new TypeHint<Tuple2<String, Double>>(){};
	KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays
                .flatMap(new Query1FlatMap()).returns(tHint)
                .keyBy(item -> item.f0);

The problem is none of all these things works and the error message is always the same as above. Does any of you know how I can fix this?
Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work even in this last case? Or, do I need to change something because of the double Tuple2?

Thank you for your attention.
Best regards,
Vincenzo
Reply | Threaded
Open this post in threaded view
|

Re: Problems with type erasure

Yun Gao
Hi Vincenzo:

    Could you also attach the codes before line 72, namely how `delays` is defined ? Since the exception says the return type of "Custom Source" could not be defined, and I think it should refer to `delays`, and the exception is thrown when an operator is called on `delays` and Flink tries to create a new transformation based on the information of `delays`.

Best,
 Yun

------------------Original Mail ------------------
Sender:Vincenzo Pronestì <[hidden email]>
Send Date:Mon Jun 22 19:02:05 2020
Recipients:flink-user <[hidden email]>
Subject:Problems with type erasure

Hi there,

I need to execute the following code:

 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays 73:               .flatMap(new Query1FlatMap()) 74:              .keyBy(item -> item.f0);but I keep getting this error message:The program finished with the following exception:The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've read this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html (there's an example with Tuple2<String,Double>> which is the same I need) and I think I have two options:1 - Implement ResultTypeQueryable<Tuple2<String, Double>> in my Query1FlatMap class. I did this by adding:@Overridepublic TypeInformation<Tuple2<String, Double>> getProducedType() {    return TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});}2 - Use the returns method right after the flatMap(new Query1FlatMap()), like this:TypeInformation<Tuple2<String,Double>> tInfo = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});        KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays                .flatMap(new Query1FlatMap()).returns(tInfo)                .keyBy(item -> item.f0);Actually I've also tried with:TypeHint<Tuple2<String,Double>> tHint = new TypeHint<Tuple2<String, Double>>(){};KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays                .flatMap(new Query1FlatMap()).returns(tHint)                .keyBy(item -> item.f0);The problem is none of all these things works and the error message is always the same as above. Does any of you know how I can fix this?Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work even in this last case? Or, do I need to change something because of the double Tuple2?Thank you for your attention.Best regards,Vincenzo
Reply | Threaded
Open this post in threaded view
|

Re: Problems with type erasure

Arvid Heise-3
Hi Vincenzo,

the preferred way to get the type information for tuples is to use org.apache.flink.api.common.typeinfo.Types. For Tuple2<Tuple2<String, String>, Integer>, you'd perform
Types.TUPLE(Types.TUPLE(Types.STRING, Types.STRING), Types.INT)
Nested tuples are not an issue in general.

On Mon, Jun 22, 2020 at 2:18 PM Yun Gao <[hidden email]> wrote:
Hi Vincenzo:

    Could you also attach the codes before line 72, namely how `delays` is defined ? Since the exception says the return type of "Custom Source" could not be defined, and I think it should refer to `delays`, and the exception is thrown when an operator is called on `delays` and Flink tries to create a new transformation based on the information of `delays`.

Best,
 Yun

------------------Original Mail ------------------
Sender:Vincenzo Pronestì <[hidden email]>
Send Date:Mon Jun 22 19:02:05 2020
Recipients:flink-user <[hidden email]>
Subject:Problems with type erasure

Hi there,

I need to execute the following code:

 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays 73:               .flatMap(new Query1FlatMap()) 74:              .keyBy(item -> item.f0);but I keep getting this error message:The program finished with the following exception:The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've read this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html (there's an example with Tuple2<String,Double>> which is the same I need) and I think I have two options:1 - Implement ResultTypeQueryable<Tuple2<String, Double>> in my Query1FlatMap class. I did this by adding:@Overridepublic TypeInformation<Tuple2<String, Double>> getProducedType() {    return TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});}2 - Use the returns method right after the flatMap(new Query1FlatMap()), like this:TypeInformation<Tuple2<String,Double>> tInfo = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});        KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays                .flatMap(new Query1FlatMap()).returns(tInfo)                .keyBy(item -> item.f0);Actually I've also tried with:TypeHint<Tuple2<String,Double>> tHint = new TypeHint<Tuple2<String, Double>>(){};KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays                .flatMap(new Query1FlatMap()).returns(tHint)                .keyBy(item -> item.f0);The problem is none of all these things works and the error message is always the same as above. Does any of you know how I can fix this?Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work even in this last case? Or, do I need to change something because of the double Tuple2?Thank you for your attention.Best regards,Vincenzo


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Problems with type erasure

Vincenzo Pronestì
In reply to this post by Yun Gao

Hi Yun,

after reading your message I checked the source and managed to fix the problem. So thank you Yun.

In case someone has the same problem. The source is a Kafka Consumer and as such it need a class that implements DeserializationSchema. One of the required methods is getProducedType. In my case

@Override
public TypeInformation<Delay> getProducedType() {
    return TypeInformation.of(Delay.class);
}

After implementing this method I was able to remove the TypeInformation from the flatMap function call.

Thank you.

Best regards,

Vincenzo

On 22/06/20 14:17, Yun Gao wrote:
Hi Vincenzo:

    Could you also attach the codes before line 72, namely how `delays` is defined ? Since the exception says the return type of "Custom Source" could not be defined, and I think it should refer to `delays`, and the exception is thrown when an operator is called on `delays` and Flink tries to create a new transformation based on the information of `delays`.

Best,
 Yun

------------------Original Mail ------------------
Sender:Vincenzo Pronestì [hidden email]
Send Date:Mon Jun 22 19:02:05 2020
Recipients:flink-user [hidden email]
Subject:Problems with type erasure

Hi there,

I need to execute the following code:

 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays 73:               .flatMap(new Query1FlatMap()) 74:              .keyBy(item -> item.f0);but I keep getting this error message:The program finished with the following exception:The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've read this guide https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html (there's an example with Tuple2<String,Double>> which is the same I need) and I think I have two options:1 - Implement ResultTypeQueryable<Tuple2<String, Double>> in my Query1FlatMap class. I did this by adding:@Overridepublic TypeInformation<Tuple2<String, Double>> getProducedType() {    return TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});}2 - Use the returns method right after the flatMap(new Query1FlatMap()), like this:TypeInformation<Tuple2<String,Double>> tInfo = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});        KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays                .flatMap(new Query1FlatMap()).returns(tInfo)                .keyBy(item -> item.f0);Actually I've also tried with:TypeHint<Tuple2<String,Double>> tHint = new TypeHint<Tuple2<String, Double>>(){};KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays                .flatMap(new Query1FlatMap()).returns(tHint)                .keyBy(item -> item.f0);The problem is none of all these things works and the error message is always the same as above. Does any of you know how I can fix this?Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work even in this last case? Or, do I need to change something because of the double Tuple2?Thank you for your attention.Best regards,Vincenzo