How to disable the state behind `COUNT` sql?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to disable the state behind `COUNT` sql?

Izual

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%[hidden email]%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 

Reply | Threaded
Open this post in threaded view
|

Re: How to disable the state behind `COUNT` sql?

Benchao Li
Hi,

There is indeed a state for the aggregation result, however we cannot disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will be kept.
If you can ensure the time gap between two records of the same id larger than, for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the count directly instead 
of return the details.

izual <[hidden email]> 于2020年4月27日周一 下午5:06写道:

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CJIRA.13212450.1548753499000.193293.1548753540145@...%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: How to disable the state behind `COUNT` sql?

Izual
Thanks, Benchao.

Maybe change the dimension table will work, but this changes a lot, include `size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored
So is it right to implement my own COUNT/SUM UDF?







At 2020-04-27 17:32:14, "Benchao Li" <[hidden email]> wrote:

Hi,

There is indeed a state for the aggregation result, however we cannot disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will be kept.
If you can ensure the time gap between two records of the same id larger than, for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the count directly instead 
of return the details.

izual <[hidden email]> 于2020年4月27日周一 下午5:06写道:

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CJIRA.13212450.1548753499000.193293.1548753540145@...%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


 

Reply | Threaded
Open this post in threaded view
|

Re: Re: How to disable the state behind `COUNT` sql?

Benchao Li
Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and 
your UDAF's aggregator will be handled by operator using state.

izual <[hidden email]> 于2020年4月27日周一 下午11:21写道:
Thanks, Benchao.

Maybe change the dimension table will work, but this changes a lot, include `size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored
So is it right to implement my own COUNT/SUM UDF?







At 2020-04-27 17:32:14, "Benchao Li" <[hidden email]> wrote:

Hi,

There is indeed a state for the aggregation result, however we cannot disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will be kept.
If you can ensure the time gap between two records of the same id larger than, for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the count directly instead 
of return the details.

izual <[hidden email]> 于2020年4月27日周一 下午5:06写道:

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CJIRA.13212450.1548753499000.193293.1548753540145@...%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: How to disable the state behind `COUNT` sql?

Jark Wu-3
Hi izual,

In such case, I think you should try COUNT DISTINCT instead of COUNT. 
DISTINCT will help to deduplicate, so no matter how many times you received id=1, the region count should always 3. 

SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id

Best,
Jark


On Mon, 27 Apr 2020 at 23:41, Benchao Li <[hidden email]> wrote:
Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and 
your UDAF's aggregator will be handled by operator using state.

izual <[hidden email]> 于2020年4月27日周一 下午11:21写道:
Thanks, Benchao.

Maybe change the dimension table will work, but this changes a lot, include `size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored
So is it right to implement my own COUNT/SUM UDF?







At 2020-04-27 17:32:14, "Benchao Li" <[hidden email]> wrote:

Hi,

There is indeed a state for the aggregation result, however we cannot disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will be kept.
If you can ensure the time gap between two records of the same id larger than, for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the count directly instead 
of return the details.

izual <[hidden email]> 于2020年4月27日周一 下午5:06写道:

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CJIRA.13212450.1548753499000.193293.1548753540145@...%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: How to disable the state behind `COUNT` sql?

Izual
Thank you, Jark.

I also have tried COUNT DISTINCT ^_^, the only problem is that if the `tblEvent` generates two simple id, such as:
t1: {"id": 1}
t2: {"id": 1}

But the sql will only output one result only on t1 record received.
I think maybe some optimizer worker background when the result does not change?


At 2020-04-28 10:53:34, "Jark Wu" <[hidden email]> wrote:

Hi izual,

In such case, I think you should try COUNT DISTINCT instead of COUNT. 
DISTINCT will help to deduplicate, so no matter how many times you received id=1, the region count should always 3. 

SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id

Best,
Jark


On Mon, 27 Apr 2020 at 23:41, Benchao Li <[hidden email]> wrote:
Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and 
your UDAF's aggregator will be handled by operator using state.

izual <[hidden email]> 于2020年4月27日周一 下午11:21写道:
Thanks, Benchao.

Maybe change the dimension table will work, but this changes a lot, include `size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored
So is it right to implement my own COUNT/SUM UDF?







At 2020-04-27 17:32:14, "Benchao Li" <[hidden email]> wrote:

Hi,

There is indeed a state for the aggregation result, however we cannot disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will be kept.
If you can ensure the time gap between two records of the same id larger than, for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the count directly instead 
of return the details.

izual <[hidden email]> 于2020年4月27日周一 下午5:06写道:

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CJIRA.13212450.1548753499000.193293.1548753540145@...%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


 

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: How to disable the state behind `COUNT` sql?

Jark Wu-3
Yes. 
This is an optimization if the previous result is the same to the new result, then operator will not generate records for the new result. 

Best,
Jark

On Tue, 28 Apr 2020 at 11:05, izual <[hidden email]> wrote:
Thank you, Jark.

I also have tried COUNT DISTINCT ^_^, the only problem is that if the `tblEvent` generates two simple id, such as:
t1: {"id": 1}
t2: {"id": 1}

But the sql will only output one result only on t1 record received.
I think maybe some optimizer worker background when the result does not change?


At 2020-04-28 10:53:34, "Jark Wu" <[hidden email]> wrote:

Hi izual,

In such case, I think you should try COUNT DISTINCT instead of COUNT. 
DISTINCT will help to deduplicate, so no matter how many times you received id=1, the region count should always 3. 

SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id

Best,
Jark


On Mon, 27 Apr 2020 at 23:41, Benchao Li <[hidden email]> wrote:
Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and 
your UDAF's aggregator will be handled by operator using state.

izual <[hidden email]> 于2020年4月27日周一 下午11:21写道:
Thanks, Benchao.

Maybe change the dimension table will work, but this changes a lot, include `size/count` is not the column of one dim table.
I notice that user can define Aggregate Functions[1],  but this page also said:
> Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored
So is it right to implement my own COUNT/SUM UDF?







At 2020-04-27 17:32:14, "Benchao Li" <[hidden email]> wrote:

Hi,

There is indeed a state for the aggregation result, however we cannot disable it, it's by design.
StreamQueryConfig.maxIdleStateRetentionTime can control how long the state will be kept.
If you can ensure the time gap between two records of the same id larger than, for example 
1 min, then setting retention time to 1min can resolve your issue.
If not, maybe you need to change your dimension table, making it return the count directly instead 
of return the details.

izual <[hidden email]> 于2020年4月27日周一 下午5:06写道:

I implements my DimTable by extends `LookupTableSource`[1], which stores data like:

id=1 -> (SH, BJ, SD)

id=2 -> (...)

and then extends `TableFunction` to return the value corresponding to the lookup keys,and maybe return multi rows, for example, when lookupkeys is id=1, then in the `TableFunction.eval`

```

collect('SH')

collect('BJ')

collect('SD')

```


Now I want to get the region'count by id, which is from the tblEvent.id, sql is :


SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id


I expect the result of COUNT is always 3 for id = 1, no matter the id=1 appears how many times.

but the actual result is : 3, 6, 9, ...


I think this is bcz the state mechanism behind COUNT, how to turn this off?

Or what's the correct use for this? StreamQueryConfig.maxIdleStateRetentionTime or something?


The reason not using state in flink: http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CJIRA.13212450.1548753499000.193293.1548753540145@...%3E

[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups



 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]


 



--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]