Multiple SQL Optimization

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple SQL Optimization

forideal
Hello
   
   There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks.
 
  create table good_sink (data varchar) with (
'connector.type' = 'console',
'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table atomic_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table bad_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table source_stream (data varchar, `key` varchar) with ( xxx ); insert into good_sink select data from source_stream where `key` = 'good'; insert into atomic_sink select data from source_stream where `key` = 'atomic'; insert into atomic_sink select data from source_stream where `key` = 'bad';

Best Wishes 


 

Reply | Threaded
Open this post in threaded view
|

Re: Multiple SQL Optimization

Jark Wu-3
Hi forideal,

Are you using `StreamTableEnvironment` or SQL CLI? 
Currently, only `TableEnvironemnt` with Blink planner have the multi-sink optimization (reuse shared upstream operators).

Best,
Jark

On Fri, 10 Apr 2020 at 16:31, forideal <[hidden email]> wrote:
Hello
   
   There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks.
 
  create table good_sink (data varchar) with (
'connector.type' = 'console',
'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table atomic_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table bad_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table source_stream (data varchar, `key` varchar) with ( xxx ); insert into good_sink select data from source_stream where `key` = 'good'; insert into atomic_sink select data from source_stream where `key` = 'atomic'; insert into atomic_sink select data from source_stream where `key` = 'bad';

Best Wishes 


 

Reply | Threaded
Open this post in threaded view
|

Re: Multiple SQL Optimization

godfrey he
In reply to this post by forideal
Hi forideal,
 Currently, Blink planner with TableEnvironment supports multiple sinks optimization which will try best to reuse common sub-graph. 

Best,
Godfrey

forideal <[hidden email]> 于2020年4月10日周五 下午4:31写道:
Hello
   
   There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks.
 
  create table good_sink (data varchar) with (
'connector.type' = 'console',
'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table atomic_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table bad_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table source_stream (data varchar, `key` varchar) with ( xxx ); insert into good_sink select data from source_stream where `key` = 'good'; insert into atomic_sink select data from source_stream where `key` = 'atomic'; insert into atomic_sink select data from source_stream where `key` = 'bad';

Best Wishes 


 

Reply | Threaded
Open this post in threaded view
|

Re:Re: Multiple SQL Optimization

forideal
In reply to this post by Jark Wu-3
Hi Jark

Thanks for your reploy.
In my code, i use `TableEnvironemnt` with Blink planner.

this.tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build());

// update three sql
sqlsWithoutFunc.forEach(sql -> {
tableEnv.sqlUpdate(sql);
});

env.execute(jobName);

Best Wishes

At 2020-04-10 16:35:33, "Jark Wu" <[hidden email]> wrote:

Hi forideal,

Are you using `StreamTableEnvironment` or SQL CLI? 
Currently, only `TableEnvironemnt` with Blink planner have the multi-sink optimization (reuse shared upstream operators).

Best,
Jark

On Fri, 10 Apr 2020 at 16:31, forideal <[hidden email]> wrote:
Hello
   
   There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks.
 
  create table good_sink (data varchar) with (
'connector.type' = 'console',
'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table atomic_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table bad_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table source_stream (data varchar, `key` varchar) with ( xxx ); insert into good_sink select data from source_stream where `key` = 'good'; insert into atomic_sink select data from source_stream where `key` = 'atomic'; insert into atomic_sink select data from source_stream where `key` = 'bad';

Best Wishes 


 



 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Multiple SQL Optimization

Jark Wu-3
Hi forideal,

You are using `StreamTableEnvironment` which doesn't support multi-sink optimization in 1.10 :) 
You should change `StreamTableEnvironment.create` to `TableEnvironment.create`. 

Btw, StreamTableEnvironment will also support multi-sink optimization in 1.11.

Best,
Jark



On Fri, 10 Apr 2020 at 16:46, forideal <[hidden email]> wrote:
Hi Jark

Thanks for your reploy.
In my code, i use `TableEnvironemnt` with Blink planner.

this.tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build());

// update three sql
sqlsWithoutFunc.forEach(sql -> {
tableEnv.sqlUpdate(sql);
});

env.execute(jobName);

Best Wishes

At 2020-04-10 16:35:33, "Jark Wu" <[hidden email]> wrote:

Hi forideal,

Are you using `StreamTableEnvironment` or SQL CLI? 
Currently, only `TableEnvironemnt` with Blink planner have the multi-sink optimization (reuse shared upstream operators).

Best,
Jark

On Fri, 10 Apr 2020 at 16:31, forideal <[hidden email]> wrote:
Hello
   
   There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks.
 
  create table good_sink (data varchar) with (
'connector.type' = 'console',
'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table atomic_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table bad_sink (data varchar) with ( 'connector.type' = 'console', 'connector.dry-run' = 'false', 'connector.property-version' = '1', 'update-mode' = 'append' ); create table source_stream (data varchar, `key` varchar) with ( xxx ); insert into good_sink select data from source_stream where `key` = 'good'; insert into atomic_sink select data from source_stream where `key` = 'atomic'; insert into atomic_sink select data from source_stream where `key` = 'bad';

Best Wishes