Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jark Wu-3
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jark Wu-3
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

godfrey he
Hi Jiahui,

Query hint is a way for fine-grained configuration. 
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each operator?

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Hey Godfrey, in some of the use cases our users have, they have a couple of complex join queries where the key domains key evolving - we definitely want some sort of state retention for those queries; but there are other where the key domain doesn't evolve overtime, but there isn't really a guarantee on what's the maximum gap between 2 records of the same key to appear in the stream, we don't want to accidentally invalidate the state for those keys in these streams.

Because of queries with different requirements can both exist in the pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? (being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact that TableConfig is read during toDataStream feels like relying on an implementation details that just happens to work, and there is no guarantee that it will keep working in the future versions...

Thanks!

From: godfrey he <[hidden email]>
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

Query hint is a way for fine-grained configuration. 
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each operator?

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

godfrey he
Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize each sink eager (that means we optimize the query when we call `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is functionally equivalent to QueryConfig.  which require we need call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or `insertInto`.  While, If we use multiple sinks optimization, It's hard to map the value of `TableConfig#setIdleStateRetentionTime` to each query. I think it's a common issue for configuring for per query on multiple sinks optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So you can call `TableConfig#setIdleStateRetentionTime` before `toRetractStream`.

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午12:15写道:
Hey Godfrey, in some of the use cases our users have, they have a couple of complex join queries where the key domains key evolving - we definitely want some sort of state retention for those queries; but there are other where the key domain doesn't evolve overtime, but there isn't really a guarantee on what's the maximum gap between 2 records of the same key to appear in the stream, we don't want to accidentally invalidate the state for those keys in these streams.

Because of queries with different requirements can both exist in the pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? (being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact that TableConfig is read during toDataStream feels like relying on an implementation details that just happens to work, and there is no guarantee that it will keep working in the future versions...

Thanks!

From: godfrey he <[hidden email]>
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

Query hint is a way for fine-grained configuration. 
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each operator?

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

godfrey he
Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a one kind of approach, but we need more discussion. 

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午7:46写道:
Yep yep :) I’m aware of the difference here for Blink and legacy Flink planner is only for sinks.

But since on the API level toDataStream doesn’t take in a query level config, so it’s easy for people to think they can’t control it on a per query basis without digging into the source code. 

I have two questions / suggestions here:

1. Since StreamQueryConfig is deprecated and we want to consolidate config classes, can we maybe add an additional endpoint like .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at least add some Java docs so that I won’t worry about the behavior under the hook suddenly change?
2. What do we think about supporting query configuration using Hints to be a first class supported Flink feature?

Thank you so much 😊

From: godfrey he <[hidden email]>
Sent: Tuesday, April 14, 2020 3:20 AM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize each sink eager (that means we optimize the query when we call `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is functionally equivalent to QueryConfig.  which require we need call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or `insertInto`.  While, If we use multiple sinks optimization, It's hard to map the value of `TableConfig#setIdleStateRetentionTime` to each query. I think it's a common issue for configuring for per query on multiple sinks optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So you can call `TableConfig#setIdleStateRetentionTime` before `toRetractStream`.

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午12:15写道:
Hey Godfrey, in some of the use cases our users have, they have a couple of complex join queries where the key domains key evolving - we definitely want some sort of state retention for those queries; but there are other where the key domain doesn't evolve overtime, but there isn't really a guarantee on what's the maximum gap between 2 records of the same key to appear in the stream, we don't want to accidentally invalidate the state for those keys in these streams.

Because of queries with different requirements can both exist in the pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? (being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact that TableConfig is read during toDataStream feels like relying on an implementation details that just happens to work, and there is no guarantee that it will keep working in the future versions...

Thanks!

From: godfrey he <[hidden email]>
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

Query hint is a way for fine-grained configuration. 
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each operator?

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jark Wu-3
Hi Jiahui,

Thanks for the inputs. 
It's a very common scenario to set specific configuration on some dedicate operators (e.g. parallelism, join strategy).
And supporting query hints is definitely on our roadmap, but may happen in 1.12. 
Support state ttl in query hints sounds reasonable to me. 

Best,
Jark

On Wed, 15 Apr 2020 at 09:45, godfrey he <[hidden email]> wrote:
Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a one kind of approach, but we need more discussion. 

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午7:46写道:
Yep yep :) I’m aware of the difference here for Blink and legacy Flink planner is only for sinks.

But since on the API level toDataStream doesn’t take in a query level config, so it’s easy for people to think they can’t control it on a per query basis without digging into the source code. 

I have two questions / suggestions here:

1. Since StreamQueryConfig is deprecated and we want to consolidate config classes, can we maybe add an additional endpoint like .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at least add some Java docs so that I won’t worry about the behavior under the hook suddenly change?
2. What do we think about supporting query configuration using Hints to be a first class supported Flink feature?

Thank you so much 😊

From: godfrey he <[hidden email]>
Sent: Tuesday, April 14, 2020 3:20 AM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize each sink eager (that means we optimize the query when we call `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is functionally equivalent to QueryConfig.  which require we need call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or `insertInto`.  While, If we use multiple sinks optimization, It's hard to map the value of `TableConfig#setIdleStateRetentionTime` to each query. I think it's a common issue for configuring for per query on multiple sinks optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So you can call `TableConfig#setIdleStateRetentionTime` before `toRetractStream`.

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午12:15写道:
Hey Godfrey, in some of the use cases our users have, they have a couple of complex join queries where the key domains key evolving - we definitely want some sort of state retention for those queries; but there are other where the key domain doesn't evolve overtime, but there isn't really a guarantee on what's the maximum gap between 2 records of the same key to appear in the stream, we don't want to accidentally invalidate the state for those keys in these streams.

Because of queries with different requirements can both exist in the pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? (being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact that TableConfig is read during toDataStream feels like relying on an implementation details that just happens to work, and there is no guarantee that it will keep working in the future versions...

Thanks!

From: godfrey he <[hidden email]>
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

Query hint is a way for fine-grained configuration. 
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each operator?

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

Jiahui Jiang
Good to know! Thank you so much for all the responses again :)

From: Jark Wu <[hidden email]>
Sent: Tuesday, April 14, 2020 10:51 PM
To: godfrey he <[hidden email]>
Cc: Jiahui Jiang <[hidden email]>; user <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

Thanks for the inputs. 
It's a very common scenario to set specific configuration on some dedicate operators (e.g. parallelism, join strategy).
And supporting query hints is definitely on our roadmap, but may happen in 1.12. 
Support state ttl in query hints sounds reasonable to me. 

Best,
Jark

On Wed, 15 Apr 2020 at 09:45, godfrey he <[hidden email]> wrote:
Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a one kind of approach, but we need more discussion. 

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午7:46写道:
Yep yep :) I’m aware of the difference here for Blink and legacy Flink planner is only for sinks.

But since on the API level toDataStream doesn’t take in a query level config, so it’s easy for people to think they can’t control it on a per query basis without digging into the source code. 

I have two questions / suggestions here:

1. Since StreamQueryConfig is deprecated and we want to consolidate config classes, can we maybe add an additional endpoint like .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at least add some Java docs so that I won’t worry about the behavior under the hook suddenly change?
2. What do we think about supporting query configuration using Hints to be a first class supported Flink feature?

Thank you so much 😊

From: godfrey he <[hidden email]>
Sent: Tuesday, April 14, 2020 3:20 AM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

I think this is the problem of multiple sinks optimization. If we optimize each sink eager (that means we optimize the query when we call `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is functionally equivalent to QueryConfig.  which require we need call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or `insertInto`.  While, If we use multiple sinks optimization, It's hard to map the value of `TableConfig#setIdleStateRetentionTime` to each query. I think it's a common issue for configuring for per query on multiple sinks optimization.

but for `toRetractStream` method, we keep eager optimization strategy. So you can call `TableConfig#setIdleStateRetentionTime` before `toRetractStream`.

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 下午12:15写道:
Hey Godfrey, in some of the use cases our users have, they have a couple of complex join queries where the key domains key evolving - we definitely want some sort of state retention for those queries; but there are other where the key domain doesn't evolve overtime, but there isn't really a guarantee on what's the maximum gap between 2 records of the same key to appear in the stream, we don't want to accidentally invalidate the state for those keys in these streams.

Because of queries with different requirements can both exist in the pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? (being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact that TableConfig is read during toDataStream feels like relying on an implementation details that just happens to work, and there is no guarantee that it will keep working in the future versions...

Thanks!

From: godfrey he <[hidden email]>
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang <[hidden email]>
Cc: Jark Wu <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

Query hint is a way for fine-grained configuration. 
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each operator?

Best,
Godfrey

Jiahui Jiang <[hidden email]> 于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build their Flink pipeline with SQL. Our framework handles all the setup and configuration, so that users only need to write the SQL queries without having to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps evolving and we want to expire the states for older keys. But there is no easy ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom SQL hint, then our framework will parse it and set it up during table registration time.

An example query that users can be writing right now looks like,

CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;


Is this something Flink SQL may want to support out of the box? (Starting from Calcite 1.22.0, it started to provide first class hint parsing)



From: Jiahui Jiang <[hidden email]>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are confusing, not knowing when the config values are used during pipeline setup is also pretty confusing. For example, the name of 'TableConfig' makes me feel it's global to the whole tableEnvironment (which is true) but is only read once at execution (which is not true). Can we try to surface or add some documentation on when are these configs are read? 😄

Thank you so much!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Yes, that's right. Set idleStateRetentionTime on TableConfig before translation should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <[hidden email]> wrote:
Thank you for answering! I was reading StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when trying to convert tables to DataStreams, planner.translate is taking the current tableConfig into account (aa in it reads the current tableConfig content even though it’s not explicitly passed in as an argument for translate). So seems like if I set tableConfig right before converting to DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole pipeline with multiple queries that also depends on each other. We have to have all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu <[hidden email]>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is confusing that TableAPI has so many different config classes. 
If you want to set different idleStateRetentionTime for different queries, you can set a new idleStateRetentionTime on TableConfig before execute/submit the query. 

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <[hidden email]> wrote:
Just looked into the source code a bit further and realized that for StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang <[hidden email]>
Sent: Friday, April 10, 2020 6:46 PM
To: [hidden email] <[hidden email]>
Subject: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10
 
Hello! I'm using Table API to write a pipeline with multiple queries. And I want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig when converting each output table into datastreams. And the translate with take the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager translate. But does that mean if I have multiple sinks for the same datastream with different idleStateRetentionTime(s) configuration, that will cause the transformation to be executed multiple times?

Thank you!