where does flink store the intermediate results of a join and what is the key?

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

where does flink store the intermediate results of a join and what is the key?

kant kodali
Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Jark Wu-3
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

kant kodali
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Jark Wu-3
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

kant kodali
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Benoît Paris-2
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

Cheers
Ben



On Thu, Jan 23, 2020 at 6:40 AM kant kodali <[hidden email]> wrote:
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Jark Wu-3
Hi Kant,
Having a custom state backend is very difficult and is not recommended. 

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is referring to integrate Table API & SQL with Queryable State. 
We also have an early issue FLINK-6968 to tracks this. 

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <[hidden email]> wrote:
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

Cheers
Ben



On Thu, Jan 23, 2020 at 6:40 AM kant kodali <[hidden email]> wrote:
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Benoît Paris-2
Dang what a massive PR: Files changed2,118+104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <[hidden email]> wrote:
Hi Kant,
Having a custom state backend is very difficult and is not recommended. 

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is referring to integrate Table API & SQL with Queryable State. 
We also have an early issue FLINK-6968 to tracks this. 

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <[hidden email]> wrote:
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

Cheers
Ben



On Thu, Jan 23, 2020 at 6:40 AM kant kodali <[hidden email]> wrote:
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml


--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Arvid Heise-3
Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right after the join, you could maintain a user state with the same result. That will of course blow up the data volume by a factor of 2, but may still be better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <[hidden email]> wrote:
Dang what a massive PR: Files changed2,118+104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <[hidden email]> wrote:
Hi Kant,
Having a custom state backend is very difficult and is not recommended. 

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is referring to integrate Table API & SQL with Queryable State. 
We also have an early issue FLINK-6968 to tracks this. 

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <[hidden email]> wrote:
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

Cheers
Ben



On Thu, Jan 23, 2020 at 6:40 AM kant kodali <[hidden email]> wrote:
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml


--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

kant kodali
Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if I don't know something I should have known. ProcessFunction just process the records right? If so, how is it better than writing to an external system? At the end of the day I want to be able to query it (doesn't have to be through Queryable state and actually I probably don't want to use Queryable state for its limitations). But ideally I want to be able to query the intermediate states using SQL and hopefully, the store that is maintaining the intermediate state has some sort of index support so the read queries are faster than doing the full scan. 

Also, I hear Querying intermediate state just like one would in a database is a widely requested feature so its a bit surprising that this is not solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right after the join, you could maintain a user state with the same result. That will of course blow up the data volume by a factor of 2, but may still be better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <[hidden email]> wrote:
Dang what a massive PR: Files changed2,118+104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <[hidden email]> wrote:
Hi Kant,
Having a custom state backend is very difficult and is not recommended. 

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is referring to integrate Table API & SQL with Queryable State. 
We also have an early issue FLINK-6968 to tracks this. 

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <[hidden email]> wrote:
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

Cheers
Ben



On Thu, Jan 23, 2020 at 6:40 AM kant kodali <[hidden email]> wrote:
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml


--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: where does flink store the intermediate results of a join and what is the key?

Arvid Heise-3
Yes, the default is writing to an external system. Especially if you want SQL, then there is currently no other way around it.

The drawbacks of writing to external systems are: additional maintenance of another system and higher latency.

On Tue, Jan 28, 2020 at 11:49 AM kant kodali <[hidden email]> wrote:
Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if I don't know something I should have known. ProcessFunction just process the records right? If so, how is it better than writing to an external system? At the end of the day I want to be able to query it (doesn't have to be through Queryable state and actually I probably don't want to use Queryable state for its limitations). But ideally I want to be able to query the intermediate states using SQL and hopefully, the store that is maintaining the intermediate state has some sort of index support so the read queries are faster than doing the full scan. 

Also, I hear Querying intermediate state just like one would in a database is a widely requested feature so its a bit surprising that this is not solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <[hidden email]> wrote:
Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right after the join, you could maintain a user state with the same result. That will of course blow up the data volume by a factor of 2, but may still be better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <[hidden email]> wrote:
Dang what a massive PR: Files changed2,118+104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <[hidden email]> wrote:
Hi Kant,
Having a custom state backend is very difficult and is not recommended. 

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is referring to integrate Table API & SQL with Queryable State. 
We also have an early issue FLINK-6968 to tracks this. 

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <[hidden email]> wrote:
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

Cheers
Ben



On Thu, Jan 23, 2020 at 6:40 AM kant kodali <[hidden email]> wrote:
Is it a common practice to have a custom state backend? if so, what would be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the performance is not good to use it for querying. 
    On the other side, AFAIK, State Process API requires the uid of operator. However, uid of operators is not set in Table API & SQL.
    So I’m not sure whether it works or not.

3)You can have a custom statebackend by implement org.apache.flink.runtime.state.StateBackend interface, and use it via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali <[hidden email]> wrote:
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same joining key right?

2) Can I use state processor API from an external application to query the intermediate results in near real-time? I thought querying rocksdb state is a widely requested feature. It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <[hidden email]> wrote:
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
    It is a 2-level map structure, where the `col1` is the join key, it is the first-level key of the state. The key of the MapState is the input row, 
    and the `count` is the number of this row, the expiredTime indicates when to cleanup this row (avoid infinite state size). You can find the source code here[1].
    In blink planner, the state structure will be more complex which is determined by the meta-information of upstream. You can see the source code of blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users should write the query result to an external system (like Mysql) and query the external system. 
    Query on the intermediate state is on the roadmap, but I guess it is not in 1.11 plan. 

Best,
Jark



2020年1月21日 18:01,kant kodali <[hidden email]> 写道:

Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an external application? while the job is running and while the job is not running?

Thanks!



--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml


--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml