Table Environment for Remote Execution

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Table Environment for Remote Execution

Satyam Shekhar
Hello,

I am running into a very basic problem while working with Table API. I wish to create a TableEnvironment connected to a remote environment that uses Blink planner in batch mode. Examples and documentation I have come across so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);


The above configuration, however, does not connect to a remote environment. Tracing code in TableEnvironment.java, I see the following method in BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't seem to find a way to instantiate a TableEnvironment that takes StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Table Environment for Remote Execution

godfrey he
Hi Satyam,

for blink batch mode, only TableEnvironment can be used, 
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar, 
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey




Satyam Shekhar <[hidden email]> 于2020年6月3日周三 下午2:59写道:
Hello,

I am running into a very basic problem while working with Table API. I wish to create a TableEnvironment connected to a remote environment that uses Blink planner in batch mode. Examples and documentation I have come across so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);


The above configuration, however, does not connect to a remote environment. Tracing code in TableEnvironment.java, I see the following method in BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't seem to find a way to instantiate a TableEnvironment that takes StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Table Environment for Remote Execution

Satyam Shekhar
Thanks for the reply, Godfrey.

I would also love to learn the reasoning behind that limitation.

For more context, I am building a Java application that receives some user input via a GRPC service. The user's input is translated to some SQL that may be executed in streaming or batch mode based on custom business logic and submitted it to Flink for execution. In my current setup, I create an ExecutionEnvironment, register sources, and execute the corresponding SQL. I was able to achieve the desired behavior with StreamTableEnvironment but it has limitations around supported SQL in batch mode.

While invoking the CLI from java program might be a solution, it doesn't feel like the most natural solution for the problem. Are there other ways to address this?

Regards,
Satyam

On Wed, Jun 3, 2020 at 12:50 AM godfrey he <[hidden email]> wrote:
Hi Satyam,

for blink batch mode, only TableEnvironment can be used, 
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar, 
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey




Satyam Shekhar <[hidden email]> 于2020年6月3日周三 下午2:59写道:
Hello,

I am running into a very basic problem while working with Table API. I wish to create a TableEnvironment connected to a remote environment that uses Blink planner in batch mode. Examples and documentation I have come across so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);


The above configuration, however, does not connect to a remote environment. Tracing code in TableEnvironment.java, I see the following method in BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't seem to find a way to instantiate a TableEnvironment that takes StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Table Environment for Remote Execution

Jark Wu-3
Hi Satyam,

In the long term, TableEnvironment is the entry point for pure Table/SQL users. So it should have all the ability of StreamExecutionEnvironment. 
I think remote execution is a reasonable feature, could you create an JIRA issue for this? 

As a workaround, you can construct `StreamTableEnvironmentImpl` by yourself via constructor, it can support batch mode and StreamExecutionEnvironment. 

Best,
Jark


On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar <[hidden email]> wrote:
Thanks for the reply, Godfrey.

I would also love to learn the reasoning behind that limitation.

For more context, I am building a Java application that receives some user input via a GRPC service. The user's input is translated to some SQL that may be executed in streaming or batch mode based on custom business logic and submitted it to Flink for execution. In my current setup, I create an ExecutionEnvironment, register sources, and execute the corresponding SQL. I was able to achieve the desired behavior with StreamTableEnvironment but it has limitations around supported SQL in batch mode.

While invoking the CLI from java program might be a solution, it doesn't feel like the most natural solution for the problem. Are there other ways to address this?

Regards,
Satyam

On Wed, Jun 3, 2020 at 12:50 AM godfrey he <[hidden email]> wrote:
Hi Satyam,

for blink batch mode, only TableEnvironment can be used, 
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar, 
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey




Satyam Shekhar <[hidden email]> 于2020年6月3日周三 下午2:59写道:
Hello,

I am running into a very basic problem while working with Table API. I wish to create a TableEnvironment connected to a remote environment that uses Blink planner in batch mode. Examples and documentation I have come across so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);


The above configuration, however, does not connect to a remote environment. Tracing code in TableEnvironment.java, I see the following method in BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't seem to find a way to instantiate a TableEnvironment that takes StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Table Environment for Remote Execution

Satyam Shekhar

Thanks, Jark & Godfrey.

The workaround was successful.

I have created the following ticket to track the issue - https://issues.apache.org/jira/browse/FLINK-18095

Regards,
Satyam

On Wed, Jun 3, 2020 at 3:26 AM Jark Wu <[hidden email]> wrote:
Hi Satyam,

In the long term, TableEnvironment is the entry point for pure Table/SQL users. So it should have all the ability of StreamExecutionEnvironment. 
I think remote execution is a reasonable feature, could you create an JIRA issue for this? 

As a workaround, you can construct `StreamTableEnvironmentImpl` by yourself via constructor, it can support batch mode and StreamExecutionEnvironment. 

Best,
Jark


On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar <[hidden email]> wrote:
Thanks for the reply, Godfrey.

I would also love to learn the reasoning behind that limitation.

For more context, I am building a Java application that receives some user input via a GRPC service. The user's input is translated to some SQL that may be executed in streaming or batch mode based on custom business logic and submitted it to Flink for execution. In my current setup, I create an ExecutionEnvironment, register sources, and execute the corresponding SQL. I was able to achieve the desired behavior with StreamTableEnvironment but it has limitations around supported SQL in batch mode.

While invoking the CLI from java program might be a solution, it doesn't feel like the most natural solution for the problem. Are there other ways to address this?

Regards,
Satyam

On Wed, Jun 3, 2020 at 12:50 AM godfrey he <[hidden email]> wrote:
Hi Satyam,

for blink batch mode, only TableEnvironment can be used, 
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar, 
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey




Satyam Shekhar <[hidden email]> 于2020年6月3日周三 下午2:59写道:
Hello,

I am running into a very basic problem while working with Table API. I wish to create a TableEnvironment connected to a remote environment that uses Blink planner in batch mode. Examples and documentation I have come across so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);


The above configuration, however, does not connect to a remote environment. Tracing code in TableEnvironment.java, I see the following method in BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't seem to find a way to instantiate a TableEnvironment that takes StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam
Reply | Threaded
Open this post in threaded view
|

Re: Table Environment for Remote Execution

Jeff Zhang
Hi Satyam,

I also meet the same issue when I integrate flink with zeppelin. Here's what I did.


If you are interested in flink on zeppelin, you can refer the following blogs and videos.

Flink on Zeppelin video

Flink on Zeppelin tutorial blogs: 1) Get started https://link.medium.com/oppqD6dIg5 2) Batch https://link.medium.com/3qumbwRIg5 3) Streaming https://link.medium.com/RBHa2lTIg5 4) Advanced usage https://link.medium.com/CAekyoXIg5



Satyam Shekhar <[hidden email]> 于2020年6月4日周四 上午2:27写道:

Thanks, Jark & Godfrey.

The workaround was successful.

I have created the following ticket to track the issue - https://issues.apache.org/jira/browse/FLINK-18095

Regards,
Satyam

On Wed, Jun 3, 2020 at 3:26 AM Jark Wu <[hidden email]> wrote:
Hi Satyam,

In the long term, TableEnvironment is the entry point for pure Table/SQL users. So it should have all the ability of StreamExecutionEnvironment. 
I think remote execution is a reasonable feature, could you create an JIRA issue for this? 

As a workaround, you can construct `StreamTableEnvironmentImpl` by yourself via constructor, it can support batch mode and StreamExecutionEnvironment. 

Best,
Jark


On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar <[hidden email]> wrote:
Thanks for the reply, Godfrey.

I would also love to learn the reasoning behind that limitation.

For more context, I am building a Java application that receives some user input via a GRPC service. The user's input is translated to some SQL that may be executed in streaming or batch mode based on custom business logic and submitted it to Flink for execution. In my current setup, I create an ExecutionEnvironment, register sources, and execute the corresponding SQL. I was able to achieve the desired behavior with StreamTableEnvironment but it has limitations around supported SQL in batch mode.

While invoking the CLI from java program might be a solution, it doesn't feel like the most natural solution for the problem. Are there other ways to address this?

Regards,
Satyam

On Wed, Jun 3, 2020 at 12:50 AM godfrey he <[hidden email]> wrote:
Hi Satyam,

for blink batch mode, only TableEnvironment can be used, 
and TableEnvironment do not take StreamExecutionEnvironment as argument.
Instead StreamExecutionEnvironment instance is created internally.

back to your requirement, you can build your table program as user jar, 
and submit the job through flink cli [1] to remote environment.

Bests,
Godfrey




Satyam Shekhar <[hidden email]> 于2020年6月3日周三 下午2:59写道:
Hello,

I am running into a very basic problem while working with Table API. I wish to create a TableEnvironment connected to a remote environment that uses Blink planner in batch mode. Examples and documentation I have come across so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
                  .useBlinkPlanner()
                  .inBatchMode()
                  .build();
var tEnv = TableEnvironment.create(settings);


The above configuration, however, does not connect to a remote environment. Tracing code in TableEnvironment.java, I see the following method in BlinkExecutorFactory.java that appears to relevant -

Executor create(Map<String, String>, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't seem to find a way to instantiate a TableEnvironment that takes StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam


--
Best Regards

Jeff Zhang