Hi all,
I want to write a program, a thread read the real-time message from /var/log/messages and write them to kafaka, and it works. Then I want to use sql of flink to query the messages, and the following are my code: ----------------------------------------------------------------------------------------------------------- // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(2); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<String> text = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties)); DataStream<Tuple4<Long, String, String, String>> messages = text.flatMap(new Tokenizer()); tableEnv.registerDataStream("Syslogs", messages, "time, user, process, msg"); Table result = tableEnv.sql( "SELECT STREAM msg FROM Syslogs WHERE msg LIKE '%system%'" ); TableSink sink = new CsvTableSink("/home/jiecxy/Desktop/test.csv", "|"); result.writeToSink(sink); // execute program env.execute(); ----------------------------------------------------------------------------------------------------------- Note: the class Tokenizer is to transfer the log to four parts. Like this: Sep 6 09:28:01 master systemd: Stopping user-988.slice. to Tuple4<time, master, systemd, Stopping user-988.slice.> But when I ran it use Flink: bin/flink run readlog.jar I got the exception... What should I do? Starting execution of program ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable driver found for jdbc:calcite: at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151) at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106) at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127) at org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56) at org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73) at org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58) at org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45) at org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376) at org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala) at org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) ... 6 more Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite: at java.sql.DriverManager.getConnection(DriverManager.java:689) at java.sql.DriverManager.getConnection(DriverManager.java:208) at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144) ... 20 more |
Hi,
this looks like a bug. I created an issue for it (https://issues.apache.org/jira/browse/FLINK-4581). Could you also send us the pom.xml you are using for your project? Timo Am 06/09/16 um 13:47 schrieb jiecxy: > Hi all, > I want to write a program, a thread read the real-time message from > /var/log/messages and write them to kafaka, and it works. Then I want to use > sql of flink to query the messages, and the following are my code: > > ----------------------------------------------------------------------------------------------------------- > > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setParallelism(2); > > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > > DataStream<String> text = env.addSource(new > FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), properties)); > DataStream<Tuple4<Long, String, String, String>> messages = > text.flatMap(new Tokenizer()); > tableEnv.registerDataStream("Syslogs", messages, "time, user, > process, msg"); > > Table result = tableEnv.sql( > "SELECT STREAM msg FROM Syslogs WHERE msg LIKE '%system%'" > ); > > > TableSink sink = new CsvTableSink("/home/jiecxy/Desktop/test.csv", > "|"); > result.writeToSink(sink); > > // execute program > env.execute(); > ----------------------------------------------------------------------------------------------------------- > Note: the class Tokenizer is to transfer the log to four parts. Like this: > Sep 6 09:28:01 master systemd: Stopping user-988.slice. > to > Tuple4<time, master, systemd, Stopping user-988.slice.> > > > But when I ran it use Flink: > bin/flink run readlog.jar > > I got the exception... What should I do? > > > Starting execution of program > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable > driver found for jdbc:calcite: > at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151) > at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106) > at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127) > at > org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56) > at > org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73) > at > org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58) > at > org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45) > at > org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376) > at > org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala) > at org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > ... 6 more > Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite: > at java.sql.DriverManager.getConnection(DriverManager.java:689) > at java.sql.DriverManager.getConnection(DriverManager.java:208) > at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144) > ... 20 more > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
I have opened a PR which should solve your problem. Would be great if
you could test it. https://github.com/apache/flink/pull/2506 Timo Am 06/09/16 um 14:31 schrieb Timo Walther: > Hi, > > this looks like a bug. I created an issue for it > (https://issues.apache.org/jira/browse/FLINK-4581). Could you also > send us the pom.xml you are using for your project? > > Timo > > Am 06/09/16 um 13:47 schrieb jiecxy: >> Hi all, >> I want to write a program, a thread read the real-time message from >> /var/log/messages and write them to kafaka, and it works. Then I want >> to use >> sql of flink to query the messages, and the following are my code: >> >> ----------------------------------------------------------------------------------------------------------- >> >> >> // set up the execution environment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.setParallelism(2); >> >> StreamTableEnvironment tableEnv = >> TableEnvironment.getTableEnvironment(env); >> >> >> DataStream<String> text = env.addSource(new >> FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), >> properties)); >> DataStream<Tuple4<Long, String, String, String>> messages = >> text.flatMap(new Tokenizer()); >> tableEnv.registerDataStream("Syslogs", messages, "time, user, >> process, msg"); >> >> Table result = tableEnv.sql( >> "SELECT STREAM msg FROM Syslogs WHERE msg LIKE >> '%system%'" >> ); >> >> >> TableSink sink = new >> CsvTableSink("/home/jiecxy/Desktop/test.csv", >> "|"); >> result.writeToSink(sink); >> >> // execute program >> env.execute(); >> ----------------------------------------------------------------------------------------------------------- >> >> Note: the class Tokenizer is to transfer the log to four parts. Like >> this: >> Sep 6 09:28:01 master systemd: Stopping user-988.slice. >> to >> Tuple4<time, master, systemd, Stopping user-988.slice.> >> >> >> But when I ran it use Flink: >> bin/flink run readlog.jar >> >> I got the exception... What should I do? >> >> >> Starting execution of program >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method >> caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) >> >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) >> >> at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) >> Caused by: java.lang.RuntimeException: java.sql.SQLException: No >> suitable >> driver found for jdbc:calcite: >> at >> org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151) >> at >> org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106) >> at >> org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127) >> at >> org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56) >> >> at >> org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73) >> >> at >> org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58) >> >> at >> org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45) >> >> at >> org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376) >> >> at >> org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala) >> >> at >> org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >> >> ... 6 more >> Caused by: java.sql.SQLException: No suitable driver found for >> jdbc:calcite: >> at java.sql.DriverManager.getConnection(DriverManager.java:689) >> at java.sql.DriverManager.getConnection(DriverManager.java:208) >> at >> org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144) >> ... 20 more >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. > > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Free forum by Nabble | Edit this page |