sink type error in scala

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

sink type error in scala

Fanbin Bu
Hi

I have my sink defined as:
class MyAwesomeSink() extends RichSinkFunction[(Boolean, Row)] {
...
}

But compile complains when I use it like:
val sink = new MyAwesomeSink()
tableEnv.toRetractStream(queryResult, classOf[Row]).addSink(sink)

 found   : MyAwesomeSink
 required: org.apache.flink.streaming.api.functions.sink.SinkFunction[org.apache.flink.api.java.tuple.Tuple2[Boolean,org.apache.flink.types.Row]]


I'm using Flink 1.9 with blink.
I tried 
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
and it doesn't work.


Any ideas?

Thanks,
Fanbin


Reply | Threaded
Open this post in threaded view
|

Re: sink type error in scala

Timo Walther
Hi Fanbin,

I think you are mixing different APIs together. We have a Scala and Java
version of both DataStream and Table API. The error message indicates
that `toRetractStream` is called on a Java Table API class because it
returns org.apache.flink.api.java.tuple.Tuple2 but your sink is
implemented in Scala with Scala tuple syntax like `(Boolean, Row)`.

Make sure you are using `org.apache.flink.table.api.scala._` instead of
`org.apache.flink.table.api.java._`.

I hope this helps.

Regards,
Timo


On 15.12.19 03:25, Fanbin Bu wrote:

> Hi
>
> I have my sink defined as:
> class MyAwesomeSink() extends RichSinkFunction[(Boolean, Row)] {
> ...
> }
>
> But compile complains when I use it like:
> val sink = new MyAwesomeSink()
> tableEnv.toRetractStream(queryResult, classOf[Row]).addSink(sink)
>
>   found   : MyAwesomeSink
>   required:
> org.apache.flink.streaming.api.functions.sink.SinkFunction[org.apache.flink.api.java.tuple.Tuple2[Boolean,org.apache.flink.types.Row]]
>
>
> I'm using Flink 1.9 with blink.
> I tried
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala._
> and it doesn't work.
>
>
> Any ideas?
>
> Thanks,
> Fanbin
>
>