Extract type information from SortedMap

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Extract type information from SortedMap

Yukun Guo
Hi,
When I run the code implementing a generic FlatMapFunction, Flink complained about InvalidTypesException:

public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T, Long>, Tuple2<T, Long>> {
@Override
public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> out) throws Exception {
for (Map.Entry<T, Long> entry : m.entrySet()) {
out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

This puzzles me as Flink should be able to infer the type from arguments. I know returns(...) or other workarounds to give type hint, but they are kind of verbose. Any suggestions?

Reply | Threaded
Open this post in threaded view
|

Re: Extract type information from SortedMap

rmetzger0
Hi Yukun,

can you also post the code how you are invoking the GenericFlatMapper on the mailing list?

The Java compiler is usually dropping the generic types during compilation ("type erasure"), that's why we can not infer the types. 



On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <[hidden email]> wrote:
Hi,
When I run the code implementing a generic FlatMapFunction, Flink complained about InvalidTypesException:

public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T, Long>, Tuple2<T, Long>> {
@Override
public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> out) throws Exception {
for (Map.Entry<T, Long> entry : m.entrySet()) {
out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

This puzzles me as Flink should be able to infer the type from arguments. I know returns(...) or other workarounds to give type hint, but they are kind of verbose. Any suggestions?


Reply | Threaded
Open this post in threaded view
|

Re: Extract type information from SortedMap

Yukun Guo
Hi Robert,

On 9 July 2016 at 00:25, Robert Metzger <[hidden email]> wrote:
Hi Yukun,

can you also post the code how you are invoking the GenericFlatMapper on the mailing list?

Here is the code defining the topology:
DataStream<String> stream = ...;
stream
.keyBy(new KeySelector<String, Integer>() {
@Override
public Integer getKey(String x) throws Exception {
return x.hashCode() % 10;
}
})
.timeWindow(Time.seconds(10))
.fold(new TreeMap<String, Long>(), new FoldFunction<String, SortedMap<String, Long>>() {
@Override
public SortedMap<String, Long> fold(SortedMap<String, Long> map,
String x) {
Long current = map.get(x);
Long updated = current != null ? current + 1 : 1;
map.put(x, updated);
return map;
}
})
.flatMap(new GenericFlatMapper<String>())
.returns(new
TypeHint<Tuple2<String, Long>>(){}.getTypeInfo()) // throws InvalidTypesException if you comment out this line
.print();
 

The Java compiler is usually dropping the generic types during compilation ("type erasure"), that's why we can not infer the types. 


The error message implies type extraction should be possible when "all variables in the return type can be deduced from the input type(s)". This is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if the signature is changed to void flatMap(SortedMap<T, Long>, Collector<Tuple2<T, Long>>), type inference fails.
 

On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <[hidden email]> wrote:
Hi,
When I run the code implementing a generic FlatMapFunction, Flink complained about InvalidTypesException:

public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T, Long>, Tuple2<T, Long>> {
@Override
public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> out) throws Exception {
for (Map.Entry<T, Long> entry : m.entrySet()) {
out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

This puzzles me as Flink should be able to infer the type from arguments. I know returns(...) or other workarounds to give type hint, but they are kind of verbose. Any suggestions?



Reply | Threaded
Open this post in threaded view
|

Re: Extract type information from SortedMap

Timo Walther
Hi Yukun,

I think the problem of the input type inference is that SortedMap is a GenericType and not a Flink native type (like Tuple or POJO). This case is not supported at the moment. You can create an issue if you like, maybe there is a way to support this special type inference case.

Timo


On 09.07.2016 11:55, Yukun Guo wrote:
Hi Robert,

On 9 July 2016 at 00:25, Robert Metzger <[hidden email]> wrote:
Hi Yukun,

can you also post the code how you are invoking the GenericFlatMapper on the mailing list?

Here is the code defining the topology:
DataStream<String> stream = ...;
stream
        .keyBy(new KeySelector<String, Integer>() {
            @Override
            public Integer getKey(String x) throws Exception {
                return x.hashCode() % 10;
            }
        })
        .timeWindow(Time.seconds(10))
        .fold(new TreeMap<String, Long>(), new FoldFunction<String, SortedMap<String, Long>>() {
            @Override
            public SortedMap<String, Long> fold(SortedMap<String, Long> map,
                                                String x) {
                Long current = map.get(x);
                Long updated = current != null ? current + 1 : 1;
                map.put(x, updated);
                return map;
            }
        })
        .flatMap(new GenericFlatMapper<String>())
        .returns(new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo())  // throws InvalidTypesException if you comment out this line
        .print();
 

The Java compiler is usually dropping the generic types during compilation ("type erasure"), that's why we can not infer the types. 


The error message implies type extraction should be possible when "all variables in the return type can be deduced from the input type(s)". This is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if the signature is changed to void flatMap(SortedMap<T, Long>, Collector<Tuple2<T, Long>>), type inference fails.
 

On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <[hidden email]> wrote:
Hi,
When I run the code implementing a generic FlatMapFunction, Flink complained about InvalidTypesException:

public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T, Long>, Tuple2<T, Long>> {
    @Override
    public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>> out) throws Exception {
        for (Map.Entry<T, Long> entry : m.entrySet()) {
            out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
        }
    }
}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

This puzzles me as Flink should be able to infer the type from arguments. I know returns(...) or other workarounds to give type hint, but they are kind of verbose. Any suggestions?





-- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr