Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Lu Weizheng
Hi,

I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.

Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()

Here's my code:

        EnvironmentSettings fsSettings =   EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
      
        // create source and sink tables...

        tEnv.executeSql("INSERT INTO sensor_1min_avg " +
                "SELECT " +
                "  room, " +
                "  AVG(temp) AS avg_temp," +
                "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
                "FROM sensor " +
                "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");

        env.execute("table api");


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Timo Walther
Hi Lu,

`env.execute("table api");` is not necessary after FLIP-84 [1]. Every
method that has `execute` in its name will immediately execute a job.
Therefore your `env.execute` has an empty pipeline.

Regards,
Timo

[1]
https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

On 13.08.20 09:34, Lu Weizheng wrote:

> Hi,
>
> I am using Flink 1.11 SQL using java. All my operations are in SQL. I
> create source tables and insert result into sink tables. No other Java
> operators. I execute it in Intellij. I can get the final result in the
> sink tables. However I get the following error. I am not sure it is a
> bug or there is something wrong in my code? Acutally it does not affect
> the computation.
>
> /Exception in thread "main" java.lang.IllegalStateException: No
> operators defined in streaming topology. Cannot execute./
> /at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
> /at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
> /at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
> /at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
>
> Here's my code:
>
>          EnvironmentSettings fsSettings =  
>   EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>          StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>          StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, fsSettings);
>          // create source and sink tables...
>
>          tEnv.executeSql("INSERT INTO sensor_1min_avg " +
>                  "SELECT " +
>                  "  room, " +
>                  "  AVG(temp) AS avg_temp," +
>                  "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
>                  "FROM sensor " +
>                  "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
>
>          env.execute("table api");
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Lu Weizheng
Thanks Timo,

So no need to use execute() method in Flink SQL If I do all the thins from source to sink in SQL.

Best Regards,
Lu

> 2020年8月13日 下午3:41,Timo Walther <[hidden email]> 写道:
>
> Hi Lu,
>
> `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline.
>
> Regards,
> Timo
>
> [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>
> On 13.08.20 09:34, Lu Weizheng wrote:
>> Hi,
>> I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.
>> /Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute./
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
>> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
>> Here's my code:
>>         EnvironmentSettings fsSettings =    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
>>         // create source and sink tables...
>>         tEnv.executeSql("INSERT INTO sensor_1min_avg " +
>>                 "SELECT " +
>>                 "  room, " +
>>                 "  AVG(temp) AS avg_temp," +
>>                 "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
>>                 "FROM sensor " +
>>                 "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
>>         env.execute("table api");
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Danny Chan
Weighing ~

tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name, the tEnv.executeSql itself did return a JobResult immediately with a constant affected rows count -1.

Best,
Danny Chan
在 2020年8月13日 +0800 PM3:46,Lu Weizheng <[hidden email]>,写道:
Thanks Timo,

So no need to use execute() method in Flink SQL If I do all the thins from source to sink in SQL.

Best Regards,
Lu

2020年8月13日 下午3:41,Timo Walther <[hidden email]> 写道:

Hi Lu,

`env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline.

Regards,
Timo

[1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

On 13.08.20 09:34, Lu Weizheng wrote:
Hi,
I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.
/Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute./
/at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
/at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
/at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
/at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
/at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
Here's my code:
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
// create source and sink tables...
tEnv.executeSql("INSERT INTO sensor_1min_avg " +
"SELECT " +
" room, " +
" AVG(temp) AS avg_temp," +
" TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
"FROM sensor " +
"GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
env.execute("table api");


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Leonard Xu
Hi, Weizheng


在 2020年8月13日,19:44,Danny Chan <[hidden email]> 写道:

tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name

`tEnv.executeSql` is an asynchronous method which will submit the job immediately. If you’re test in your  IDE, you’d better obtain the TableResult object and wait for the execution as following piece of code,
otherwise your `main()` method in demo may exit before the execution finished.

TableResult result = tableEnvironment.executeSql("insert into ... ");
// wait for the insert job finished
result.getJobClient().get()
      .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

Best
Leonard