Re: type error with generics ..

Posted by Debasish Ghosh on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/type-error-with-generics-tp29605p29623.html

Thanks .. I tried this ..

DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo());

But still get the same error on this line ..

(BTW I am not sure how to invoke returns on a DataStream and hence had to do a fake map - any suggestions here ?)

regards.

On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <[hidden email]> wrote:
Hi Debasish,

I think the error refers to the output of your source instead of your result of the map function. E.g.
DataStream<Data> ins = readStream(in, Data.class, serdeData).returns(new TypeInformation<Data>);
DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());

--
Rong

On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <[hidden email]> wrote:
Hello -

I have the following call to addSource where I pass a Custom SourceFunction ..

env.<Data>addSource(
  new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new TypeHint<Data>(){}))
)

where data is List<Data> and CollectionSourceFunctionJ is a Scala case class ..

case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: TypeInformation[T]) extends SourceFunction[T] {
  def cancel(): Unit = {}
  def run(ctx: SourceContext[T]): Unit = {
    data.asScala.foreach(d ⇒ ctx.collect(d))
  }
}


When the following transformation runs ..

DataStream<Data> ins = readStream(in, Data.class, serdeData);
DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());


I get the following exception in the second line ..

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

Initially the returns call was not there and I was getting the same exception. Now after adding the returns call, nothing changes.

Any help will be appreciated ..



--