type error with generics ..

Posted by Debasish Ghosh on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/type-error-with-generics-tp29605.html

Hello -

I have the following call to addSource where I pass a Custom SourceFunction ..

env.<Data>addSource(
  new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new TypeHint<Data>(){}))
)

where data is List<Data> and CollectionSourceFunctionJ is a Scala case class ..

case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: TypeInformation[T]) extends SourceFunction[T] {
  def cancel(): Unit = {}
  def run(ctx: SourceContext[T]): Unit = {
    data.asScala.foreach(d ⇒ ctx.collect(d))
  }
}


When the following transformation runs ..

DataStream<Data> ins = readStream(in, Data.class, serdeData);
DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());


I get the following exception in the second line ..

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

Initially the returns call was not there and I was getting the same exception. Now after adding the returns call, nothing changes.

Any help will be appreciated ..

regards.

--