http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Converting-a-DataStream-into-a-Table-throws-error-tp21975.html
Hi,
I am following this example
This is my dataStream which is built on a Kafka topic
//
//Create a Kafka consumer
//
val dataStream = streamExecEnv
.addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))
//
//
val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price)
While compiling it throws this error
[error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169: overloaded method value fromDataStream with alternatives:
[error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table <and>
[error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol)
[error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price)
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
The topic is very simple, it is comma separated prices. I tried mapFunction and flatMap but neither worked!
Thanks,