Force Join Unique Key

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Force Join Unique Key

Rex Fenley
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Jark Wu-3
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Rex Fenley
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Jark Wu-3
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Rex Fenley
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Jark Wu-3
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. 

Are you asking where the join key is set? The join key is set by the framework via `AbstractStreamOperator#setKeyContextElement1`.

Best,
Jark

On Thu, 19 Nov 2020 at 03:18, Rex Fenley <[hidden email]> wrote:
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Rex Fenley
Ok, but if there is only 1 row per Join key on either side of the join, then wouldn't "iterate all the values in the MapState under the current key" effectively be "iterate 1 value in MapState under the current key" which would be O(1)? Or are you saying that it must seek across the entire dataset for the whole table even for that 1 row on either side of the join?

Thanks for the help so far!

On Wed, Nov 18, 2020 at 6:30 PM Jark Wu <[hidden email]> wrote:
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. 

Are you asking where the join key is set? The join key is set by the framework via `AbstractStreamOperator#setKeyContextElement1`.

Best,
Jark

On Thu, 19 Nov 2020 at 03:18, Rex Fenley <[hidden email]> wrote:
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Jark Wu-3
Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know how many entries are under the join key. 

On Thu, 19 Nov 2020 at 13:38, Rex Fenley <[hidden email]> wrote:
Ok, but if there is only 1 row per Join key on either side of the join, then wouldn't "iterate all the values in the MapState under the current key" effectively be "iterate 1 value in MapState under the current key" which would be O(1)? Or are you saying that it must seek across the entire dataset for the whole table even for that 1 row on either side of the join?

Thanks for the help so far!

On Wed, Nov 18, 2020 at 6:30 PM Jark Wu <[hidden email]> wrote:
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. 

Are you asking where the join key is set? The join key is set by the framework via `AbstractStreamOperator#setKeyContextElement1`.

Best,
Jark

On Thu, 19 Nov 2020 at 03:18, Rex Fenley <[hidden email]> wrote:
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Rex Fenley
I'm reading your response as rocksdb having to seek across the whole dataset for the whole table, which we hope to avoid.

What are the rules for the unique key and unique join key inference? Maybe we can reorganize our plan to allow it to infer unique keys more correctly.

Thanks

On Wed, Nov 18, 2020 at 9:50 PM Jark Wu <[hidden email]> wrote:
Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know how many entries are under the join key. 

On Thu, 19 Nov 2020 at 13:38, Rex Fenley <[hidden email]> wrote:
Ok, but if there is only 1 row per Join key on either side of the join, then wouldn't "iterate all the values in the MapState under the current key" effectively be "iterate 1 value in MapState under the current key" which would be O(1)? Or are you saying that it must seek across the entire dataset for the whole table even for that 1 row on either side of the join?

Thanks for the help so far!

On Wed, Nov 18, 2020 at 6:30 PM Jark Wu <[hidden email]> wrote:
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. 

Are you asking where the join key is set? The join key is set by the framework via `AbstractStreamOperator#setKeyContextElement1`.

Best,
Jark

On Thu, 19 Nov 2020 at 03:18, Rex Fenley <[hidden email]> wrote:
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Rex Fenley
I have a few more questions.

Even if a join has no unique keys, couldn't the join key be used to organize records into a tree, of groups of records, per join key so that lookups are faster?

I also have been looking at RocksDB docs and it looks like it has a RangeScan operation. I'm guessing then join keys could also be hashed in such a way to enable faster lookup by RangeScan. I also noticed mention of Prefix Iterators, which might actually do what I'm suggesting.

Have either of these been considered?

Thanks!

On Thu, Nov 19, 2020 at 6:51 PM Rex Fenley <[hidden email]> wrote:
I'm reading your response as rocksdb having to seek across the whole dataset for the whole table, which we hope to avoid.

What are the rules for the unique key and unique join key inference? Maybe we can reorganize our plan to allow it to infer unique keys more correctly.

Thanks

On Wed, Nov 18, 2020 at 9:50 PM Jark Wu <[hidden email]> wrote:
Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know how many entries are under the join key. 

On Thu, 19 Nov 2020 at 13:38, Rex Fenley <[hidden email]> wrote:
Ok, but if there is only 1 row per Join key on either side of the join, then wouldn't "iterate all the values in the MapState under the current key" effectively be "iterate 1 value in MapState under the current key" which would be O(1)? Or are you saying that it must seek across the entire dataset for the whole table even for that 1 row on either side of the join?

Thanks for the help so far!

On Wed, Nov 18, 2020 at 6:30 PM Jark Wu <[hidden email]> wrote:
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. 

Are you asking where the join key is set? The join key is set by the framework via `AbstractStreamOperator#setKeyContextElement1`.

Best,
Jark

On Thu, 19 Nov 2020 at 03:18, Rex Fenley <[hidden email]> wrote:
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Force Join Unique Key

Jark Wu-3
Hi Rex,

The join key already has been used to organize records. As I said before, "the join key is the key of the keyed states". So an iterate on the MapState actually is a range scan (scan the join key prefix). However, this will perform "seek" operation which is rather slow than "get" operation.

Best,
Jark

On Sat, 21 Nov 2020 at 09:47, Rex Fenley <[hidden email]> wrote:
I have a few more questions.

Even if a join has no unique keys, couldn't the join key be used to organize records into a tree, of groups of records, per join key so that lookups are faster?

I also have been looking at RocksDB docs and it looks like it has a RangeScan operation. I'm guessing then join keys could also be hashed in such a way to enable faster lookup by RangeScan. I also noticed mention of Prefix Iterators, which might actually do what I'm suggesting.

Have either of these been considered?

Thanks!

On Thu, Nov 19, 2020 at 6:51 PM Rex Fenley <[hidden email]> wrote:
I'm reading your response as rocksdb having to seek across the whole dataset for the whole table, which we hope to avoid.

What are the rules for the unique key and unique join key inference? Maybe we can reorganize our plan to allow it to infer unique keys more correctly.

Thanks

On Wed, Nov 18, 2020 at 9:50 PM Jark Wu <[hidden email]> wrote:
Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know how many entries are under the join key. 

On Thu, 19 Nov 2020 at 13:38, Rex Fenley <[hidden email]> wrote:
Ok, but if there is only 1 row per Join key on either side of the join, then wouldn't "iterate all the values in the MapState under the current key" effectively be "iterate 1 value in MapState under the current key" which would be O(1)? Or are you saying that it must seek across the entire dataset for the whole table even for that 1 row on either side of the join?

Thanks for the help so far!

On Wed, Nov 18, 2020 at 6:30 PM Jark Wu <[hidden email]> wrote:
Actually, if there is no unique key, it's not O(1), because there maybe multiple rows are joined by the join key, i.e. iterate all the values in the MapState under the current key, this is a "seek" operation on rocksdb which is not efficient. 

Are you asking where the join key is set? The join key is set by the framework via `AbstractStreamOperator#setKeyContextElement1`.

Best,
Jark

On Thu, 19 Nov 2020 at 03:18, Rex Fenley <[hidden email]> wrote:
Thanks for the info.

So even if there is no unique key inferred for a Row, the set of rows to join on each Join key should effectively still be an O(1) lookup if the join key is unique right?

Also, I've been digging around the code to find where the lookup of rows for a join key happens and haven't come across anything. Mind pointing me in the right direction?

Thanks!

cc Brad

On Wed, Nov 18, 2020 at 7:39 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the join operator may use 3 kinds of state structure depending on the input key and join key information.

1) input doesn't have a unique key => MapState<row, count>, 
where the map key is the input row and the map value is the number of equal rows.

2) input has unique key, but the unique key is not a subset of join key => MapState<UK, row>
this is better than the above one, because it has a shorter map key and 
is more efficient when retracting records. 

3) input has a unique key, and the unique key is a subset of join key => ValueState<row>
this is the best performance, because it only performs a "get" operation rather than "seek" on rocksdb
 for each record of the other input side. 

Note: the join key is the key of the keyed states. 

You can see the implementation differences in org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews. 

Best,
Jark

On Wed, 18 Nov 2020 at 02:30, Rex Fenley <[hidden email]> wrote:
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu <[hidden email]> wrote:
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. 
There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. 

Query hint is a nice way for this, but it is not supported yet. 
We have an issue to track supporting query hint, see FLINK-17173. 

Beest,
Jark
 

On Tue, 17 Nov 2020 at 15:23, Rex Fenley <[hidden email]> wrote:
Hello,

I have quite a few joins in my plan that have
leftInputSpec=[NoUniqueKey]
in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated.

Is there a way to hint to the join what the unique key is for a table?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US