Semantic when table joins table from window

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Semantic when table joins table from window

徐涛
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry
Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

Fabian Hueske-2
Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry

Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

徐涛
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry

在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry


Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

Hequn Cheng
Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry



Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

徐涛
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry




Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

Hequn Cheng
Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry




Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

徐涛
Hi Hequn,
Thanks a lot four your response! This helps me understand the mechanism more clearly.

I have another question:
How do I use flink to accoplish time attenuation?
If a use join plus retention time solution, I can only get the increment data. But some other data may need to be recomputed because the time attenuation. Then how do I flush them?

Best,
Henry, 

在 2018年8月22日,上午10:30,Hequn Cheng <[hidden email]> 写道:

Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry





Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

Hequn Cheng
Hi Henry,

You can increase the retention time to make sure all data you want won't be expired.
As for incremental, I think we can sink results into a kv storage, say hbase. The hbase table contains a total and latest data set you want so that you don't need to flush again. Would it be satisfy your scenario?

Best, Hequn


On Wed, Aug 22, 2018 at 2:51 PM 徐涛 <[hidden email]> wrote:
Hi Hequn,
Thanks a lot four your response! This helps me understand the mechanism more clearly.

I have another question:
How do I use flink to accoplish time attenuation?
If a use join plus retention time solution, I can only get the increment data. But some other data may need to be recomputed because the time attenuation. Then how do I flush them?

Best,
Henry, 

在 2018年8月22日,上午10:30,Hequn Cheng <[hidden email]> 写道:

Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry





Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

徐涛
In reply to this post by Hequn Cheng
Hi Hequn,
You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
       Why does Flink have this limitation?
       I have a temp view
        var finalTable = tableEnv.sqlQuery(s"select * from
A join B on xxxx
join C on xxxx " )
tableEnv.registerTable("finalTable", finalTable)
     
      And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?

      Thanks a lot.

Best
Henry



在 2018年8月22日,上午10:30,Hequn Cheng <[hidden email]> 写道:

Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry





Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

Fabian Hueske-2
Hi,

Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied.

Have you tried to use a window join? These preserve the timestamp order. 

Fabian

徐涛 <[hidden email]> schrieb am Di., 28. Aug. 2018, 11:42:
Hi Hequn,
You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
       Why does Flink have this limitation?
       I have a temp view
        var finalTable = tableEnv.sqlQuery(s"select * from
A join B on xxxx
join C on xxxx " )
tableEnv.registerTable("finalTable", finalTable)
     
      And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?

      Thanks a lot.

Best
Henry



在 2018年8月22日,上午10:30,Hequn Cheng <[hidden email]> 写道:

Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry





Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

徐涛
Hi Fabian,
I am working on a application that compute the “score" of an article by the number of praises, and reduce the score by the time, I am balancing on two choices:
1. Use global window join the article and article praise, with 3 days state retention, but I can not get the current time ,time is fixed when the program is started, so I can not compute the reduced score. I have to sink the data, then write some crontab jobs to update the score.
2. Use sliding window join, window length is 3 days , and sliding by one minute, this time I can get the window end time, but there so much data duplicated in windows, there are performance issues.
Each choices is not good enough, I am wondering if there are some other solves. Thanks a lot.

Best
Henry

在 2018年8月28日,下午8:05,Fabian Hueske <[hidden email]> 写道:

Hi,

Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied.

Have you tried to use a window join? These preserve the timestamp order. 

Fabian

徐涛 <[hidden email]> schrieb am Di., 28. Aug. 2018, 11:42:
Hi Hequn,
You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
       Why does Flink have this limitation?
       I have a temp view
        var finalTable = tableEnv.sqlQuery(s"select * from
A join B on xxxx
join C on xxxx " )
tableEnv.registerTable("finalTable", finalTable)
     
      And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?

      Thanks a lot.

Best
Henry



在 2018年8月22日,上午10:30,Hequn Cheng <[hidden email]> 写道:

Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry






Reply | Threaded
Open this post in threaded view
|

Re: Semantic when table joins table from window

Hequn Cheng
Hi Henry,

Fabian is right. You can try to use window join if your want a bounded join.

According to your descriptions. I think what you want is(correct me if I'm wrong) :
- Only join data within 3 days
- Score should be calculated in bounded way
- Retract previous score which exceed 3 days

So, I think window join + bounded over may solve your problem. Do window join by `article.time between praise.time - 3days and praise.time + 3days`. You don't have to add sliding window before window join. After window join, you can perform a bounded over with 3 days interval to get the scores.
There are documents about window join[1] and over[2].


On Tue, Aug 28, 2018 at 9:59 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
I am working on a application that compute the “score" of an article by the number of praises, and reduce the score by the time, I am balancing on two choices:
1. Use global window join the article and article praise, with 3 days state retention, but I can not get the current time ,time is fixed when the program is started, so I can not compute the reduced score. I have to sink the data, then write some crontab jobs to update the score.
2. Use sliding window join, window length is 3 days , and sliding by one minute, this time I can get the window end time, but there so much data duplicated in windows, there are performance issues.
Each choices is not good enough, I am wondering if there are some other solves. Thanks a lot.

Best
Henry

在 2018年8月28日,下午8:05,Fabian Hueske <[hidden email]> 写道:

Hi,

Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied.

Have you tried to use a window join? These preserve the timestamp order. 

Fabian

徐涛 <[hidden email]> schrieb am Di., 28. Aug. 2018, 11:42:
Hi Hequn,
You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
       Why does Flink have this limitation?
       I have a temp view
        var finalTable = tableEnv.sqlQuery(s"select * from
A join B on xxxx
join C on xxxx " )
tableEnv.registerTable("finalTable", finalTable)
     
      And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either?

      Thanks a lot.

Best
Henry



在 2018年8月22日,上午10:30,Hequn Cheng <[hidden email]> 写道:

Hi Hery,

As for choise1:
  • The state size of join depends on it's input table size, not the result table, so the state size of join of choise1 depends on how many article id, praise id and response_id. 
  • Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, so the state size won't grows if you keep pouring same article id. I think the problem here is you need a distinct before join, so that a praise id won't join multi same article ids, and this will influence the correctness of the result.
  • I think you need do aggregate before join to make sure the correctness of the result. Because there are duplicated article id after article join praise and this will influence the value of count(r.response_id).
  • You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict.
  • Hop window with large fixed duration and small hop interval should be avoided. Data will be redundant in various windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size.
As for choice2:
  • I think you need another filed(for example, HOP_START) when join the three tables. Only join records in same window.
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger.

Best, Hequn

On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. 

I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc
Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days.

I have two choises,
1. join the article table and praise table, response table then window
select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount
from
article a
left join
praise p on a.article_id = p.article_id
left join
response r on a.article_id = r.article_id
group by hop(updated_time, interval '1' minute,interval '3' day) , article_id
2. window the article table, window the priase table, window the response table ,then join them together
select aAggr.article_id, pAggr.pCount, rAggr.rCount
(select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr
left join
(select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id
left join
(select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id

Maybe I should choose 1,   join then window, but not window then join.   
Please correct me if I am wrong.

I have some worries when choose 1,
I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? 

Best,
Henry

在 2018年8月21日,下午9:29,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103"
2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100

Best, Hequn

On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
Hi Fabian,
Thanks for your response. This question puzzles me for quite a long time.
If the praiseAggr has the following value:
window-1     100,101,102
window-2     100,101,103
window-3     100

the last time the article table joins praiseAggr, which of the following value does praiseAggr table has?
1— 100,101,102,100,101,103,100           collect all the element of all the window
2—  100    the element of the latest window
3—  101,102,103    the distinct value of all the window


Best,
Henry


在 2018年8月21日,下午8:02,Fabian Hueske <[hidden email]> 写道:

Hi,

The semantics of a query do not depend on the way that it is used.
praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table.
This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr.

Best, Fabian

2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" )
tableEnv.registerTable("praiseAggr", praiseAggr)
    var finalTable = tableEnv.sqlQuery(sSELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" )
tableEnv.registerTable(
"finalTable", finalTable)
 I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem?
 Thanks a lot.


Best, 
Henry