Hi, I am following this example This is my dataStream which is built on a Kafka topic // //Create a Kafka consumer // val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) // // val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) While compiling it throws this error [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table <and> [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed The topic is very simple, it is comma separated prices. I tried mapFunction and flatMap but neither worked! Thanks, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
|
Hi, Mich You can try adding "import org.apache.flink.table.api.scala._", so that the Symbol can be recognized as an Expression. Best, Hequn On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh <[hidden email]> wrote:
|
In reply to this post by Mich Talebzadeh
Hi Mich, The field specified by the fromDataStream API must match the number of fields contained in the DataStream stream object, your DataStream's type is just a string, example is here.[1] Thanks, vino. 2018-08-01 6:16 GMT+08:00 Mich Talebzadeh <[hidden email]>:
|
In reply to this post by Hequn Cheng
Hi both, I added the import as Hequn suggested. My stream is very simple and consists of 4 values separated by "," as below 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 So this is what I have been trying to do Code val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) // // val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) note those four columns in Table1 definition And this is the error being thrown [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table <and> [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed I suspect dataStream may not be compatible with this operation? Regards, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Wed, 1 Aug 2018 at 04:51, Hequn Cheng <[hidden email]> wrote:
|
Hi Mich,
I would check you imports again [1]. This is a pure compiler issue that is unrelated to your actual data stream. Also check your project dependencies. Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:
|
Hi Timo, These are my two flink table related imports import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment And these are my dependencies building with SBT libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % "1.5.0" % "provided" libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % "provided" libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" There appears to be conflict somewhere that cause this error [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table <and> [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed Thanks Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Wed, 1 Aug 2018 at 09:17, Timo Walther <[hidden email]> wrote:
|
If these two imports are the only
imports that you added, then you did not follow Hequn's advice or
the link that I sent you.
You need to add the underscore imports to let Scala do its magic. Timo Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
|
Hi, I believed I tried Hequn's suggestion and tried again import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ Unfortunately I am still getting the same error! [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table <and> [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM Completed compiling Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Wed, 1 Aug 2018 at 10:03, Timo Walther <[hidden email]> wrote:
|
Hi I think you are mixing Java and Scala dependencies. org.apache.flink.streaming. You should use the DataStream of the Scala DataStream API. Best, Fabian 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh <[hidden email]>:
|
Hi, FYI, these are my imports import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.TimeCharacteristic import org.slf4j.LoggerFactory import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import java.util.Calendar import java.util.Date import java.text.DateFormat import java.text.SimpleDateFormat import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess import java.io.File And this is the simple code val properties = new Properties() properties.setProperty("bootstrap.servers", bootstrapServers) properties.setProperty("zookeeper.connect", zookeeperConnect) properties.setProperty("group.id", flinkAppName) properties.setProperty("auto.offset.reset", "latest") val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val tableEnv = TableEnvironment.getTableEnvironment(dataStream) tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, 'timeissued, 'price) And this is the compilation error info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138: overloaded method value getTableEnvironment with alternatives: [error] (executionEnvironment: org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment <and> [error] (executionEnvironment: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment <and> [error] (executionEnvironment: org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment <and> [error] (executionEnvironment: org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String]) [error] val tableEnv = TableEnvironment.getTableEnvironment(dataStream) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM Completed compiling which is really strange Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Wed, 1 Aug 2018 at 13:42, Fabian Hueske <[hidden email]> wrote:
|
Hi, You have to pass the StreamExecutionEnvironment to the getTableEnvironment() method, not the DataStream (or DataStreamSource). Change val tableEnv = TableEnvironment.getTableEnvir to val tableEnv = TableEnvironment.getTableEnvir Best, Fabian 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh <[hidden email]>:
|
Changed as suggested val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 'timeissued, 'price) Still the same error [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139: overloaded method value registerDataStream with alternatives: [error] [T](name: String, dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)Unit <and> [error] [T](name: String, dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])Unit [error] cannot be applied to (String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, Symbol, Symbol, Symbol, Symbol) [error] tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 'timeissued, 'price) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM Thanks anyway. Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Wed, 1 Aug 2018 at 23:34, Fabian Hueske <[hidden email]> wrote:
|
Hi Mich, It seems that the type of your DataStream stream is always wrong. If you want to specify four fields, usually the DataStream type should be similar: DataStream[(Type1, Type2, Type3, Type4)], not DataStream[String], you can try it. Thanks, vino 2018-08-02 6:44 GMT+08:00 Mich Talebzadeh <[hidden email]>:
|
In reply to this post by Mich Talebzadeh
How does your build.sbt looks especially dependencies? On 2. Aug 2018, at 00:44, Mich Talebzadeh <[hidden email]> wrote:
|
Hi Jorn, Here you go the dependencies libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % "provided" libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" Thanks Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Thu, 2 Aug 2018 at 06:19, Jörn Franke <[hidden email]> wrote:
|
In reply to this post by vino yang
Thanks everyone for the advice This worked and passed the compilation error import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ …. val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", dataStream, "key, ticker, timeissued, price") sqltext = "SELECT key from priceTable"; val result = tableEnv.sql(sqltext); Now I get this runtime error …. [success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM Completed compiling Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode** Starting execution of program java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment$ at md_streaming$.main(md_streaming.scala:140) at md_streaming.main(md_streaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.TableEnvironment$ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Thu, 2 Aug 2018 at 03:07, vino yang <[hidden email]> wrote:
|
In reply to this post by Mich Talebzadeh
Hi Mich,
could you share your project with us (maybe on github)? Then we can import it and debug what the problem is. Regards, Timo Am 02.08.18 um 07:37 schrieb Mich Talebzadeh:
|
Tremendous. Many thanks. Put the sbt build file and the Scala code here Regards, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Thu, 2 Aug 2018 at 08:27, Timo Walther <[hidden email]> wrote:
|
Whenever you use Scala and there is a
Scala specific class use it.
remove: import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment add: import org.apache.flink.streaming.api.scala._ This will use org.apache.flink.streaming.api.scala.StreamExecutionEnvironment. Timo Am 02.08.18 um 09:47 schrieb Mich Talebzadeh:
|
Thanks Timo, Did as suggested getting this compilation error [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] [error] .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Thu, 2 Aug 2018 at 09:01, Timo Walther <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |