Re: type error with generics ..

Posted by Rong Rong on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/type-error-with-generics-tp29605p29639.html

I am not sure how the function `readStream` is implemented (also which version of Flink are you using?). 
Can you share more information on your code blocks and exception logs?

Also to answer your question, DataStream return type is determined by its underlying transformation, so you cannot set it directly.

Thanks,
Rong

On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <[hidden email]> wrote:
Thanks .. I tried this ..

DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo());

But still get the same error on this line ..

(BTW I am not sure how to invoke returns on a DataStream and hence had to do a fake map - any suggestions here ?)

regards.

On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <[hidden email]> wrote:
Hi Debasish,

I think the error refers to the output of your source instead of your result of the map function. E.g.
DataStream<Data> ins = readStream(in, Data.class, serdeData).returns(new TypeInformation<Data>);
DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());

--
Rong

On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <[hidden email]> wrote:
Hello -

I have the following call to addSource where I pass a Custom SourceFunction ..

env.<Data>addSource(
  new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new TypeHint<Data>(){}))
)

where data is List<Data> and CollectionSourceFunctionJ is a Scala case class ..

case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: TypeInformation[T]) extends SourceFunction[T] {
  def cancel(): Unit = {}
  def run(ctx: SourceContext[T]): Unit = {
    data.asScala.foreach(d ⇒ ctx.collect(d))
  }
}


When the following transformation runs ..

DataStream<Data> ins = readStream(in, Data.class, serdeData);
DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());


I get the following exception in the second line ..

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

Initially the returns call was not there and I was getting the same exception. Now after adding the returns call, nothing changes.

Any help will be appreciated ..



--