Hi,
I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I
get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()
Here's my code:
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
// create source and sink tables...
tEnv.executeSql("INSERT INTO sensor_1min_avg " +
"SELECT " +
" room, " +
" AVG(temp) AS avg_temp," +
" TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
"FROM sensor " +
"GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
env.execute("table api");
|
Hi Lu,
`env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline. Regards, Timo [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 On 13.08.20 09:34, Lu Weizheng wrote: > Hi, > > I am using Flink 1.11 SQL using java. All my operations are in SQL. I > create source tables and insert result into sink tables. No other Java > operators. I execute it in Intellij. I can get the final result in the > sink tables. However I get the following error. I am not sure it is a > bug or there is something wrong in my code? Acutally it does not affect > the computation. > > /Exception in thread "main" java.lang.IllegalStateException: No > operators defined in streaming topology. Cannot execute./ > /at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/ > /at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/ > /at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/ > /at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/ > /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/ > > Here's my code: > > EnvironmentSettings fsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(env, fsSettings); > // create source and sink tables... > > tEnv.executeSql("INSERT INTO sensor_1min_avg " + > "SELECT " + > " room, " + > " AVG(temp) AS avg_temp," + > " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " + > "FROM sensor " + > "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)"); > > env.execute("table api"); > > |
Thanks Timo,
So no need to use execute() method in Flink SQL If I do all the thins from source to sink in SQL. Best Regards, Lu > 2020年8月13日 下午3:41,Timo Walther <[hidden email]> 写道: > > Hi Lu, > > `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline. > > Regards, > Timo > > [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > > On 13.08.20 09:34, Lu Weizheng wrote: >> Hi, >> I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation. >> /Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute./ >> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/ >> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/ >> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/ >> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/ >> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/ >> Here's my code: >> EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings); >> // create source and sink tables... >> tEnv.executeSql("INSERT INTO sensor_1min_avg " + >> "SELECT " + >> " room, " + >> " AVG(temp) AS avg_temp," + >> " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " + >> "FROM sensor " + >> "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)"); >> env.execute("table api"); > |
Weighing ~
tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name, the tEnv.executeSql itself did return a JobResult immediately with a constant affected rows count -1. Best,
Danny Chan
在 2020年8月13日 +0800 PM3:46,Lu Weizheng <[hidden email]>,写道:
Thanks Timo, |
Hi, Weizheng
`tEnv.executeSql` is an asynchronous method which will submit the job immediately. If you’re test in your IDE, you’d better obtain the TableResult object and wait for the execution as following piece of code, otherwise your `main()` method in demo may exit before the execution finished. TableResult result = tableEnvironment.executeSql("insert into ... "); // wait for the insert job finished result.getJobClient().get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); Best Leonard
|
Free forum by Nabble | Edit this page |