Hi All,
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , article_id" ) tableEnv.registerTable("praiseAggr", praiseAggr) var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join praiseAggr p on a.article_id=p.article_id" ) I know that praiseAggr, if written to sink, is append mode , so if a table joins praiseAggr, what the table “see”, is a table contains the latest value, or a table that grows larger and larger? If it is the later, will it introduce performance problem? Thanks a lot. Best, Henry
|
Hi, The semantics of a query do not depend on the way that it is used. praiseAggr is a table that grows by one row per second and article_id. If you use that table in a join, the join will fully materialize the table. This is a special case because the same row is added multiple times, so the state won't grow that quickly, but the performance will decrease because for each row from article will join with multiple (a growing number) of rows from praiseAggr. Best, Fabian 2018-08-21 12:19 GMT+02:00 徐涛 <[hidden email]>:
|
Hi Fabian, Thanks for your response. This question puzzles me for quite a long time. If the praiseAggr has the following value: window-1 100,101,102 window-2 100,101,103 window-3 100 the last time the article table joins praiseAggr, which of the following value does praiseAggr table has? 1— 100,101,102,100,101,103,100 collect all the element of all the window 2— 100 the element of the latest window 3— 101,102,103 the distinct value of all the window Best, Henry
|
Hi Henry, praiseAggr is an append table, so it contains "100,101,102,100,101,103,100". 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY article_id", the answer is "101,102,103" 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", the answer is "100" Best, Hequn On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <[hidden email]> wrote:
|
Hi Fabian,
So maybe I can not join a table that generate from a window, because the table is getting larger and larger as the time goes, maybe the system will crash one day. I am working on a system that calculate the “score" of article, which is consist of the count of article praise, the count of article response, etc Because I can not use flink to save all the article, I decide to update the score of the article that created in 3 days. I have two choises, 1. join the article table and praise table, response table then window select a.article_id, count(p.praise_id) as pCount, count(r.response_id) as rCount from article a left join praise p on a.article_id = p.article_id left join response r on a.article_id = r.article_id group by hop(updated_time, interval '1' minute,interval '3' day) , article_id 2. window the article table, window the priase table, window the response table ,then join them together select aAggr.article_id, pAggr.pCount, rAggr.rCount (select article_id from article group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) aAggr left join (select article_id,count(praise_id) as pCount from praise group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) pAggr on aAggr.article_id=pAggr.article_id left join (select article_id,count(response_id) as rCount from response group by hop(updated_time, interval '1' minute,interval '3' day) , article_id) rAggr on aAggr.article_id=rAggr.article_id Maybe I should choose 1, join then window, but not window then join. Please correct me if I am wrong. I have some worries when choose 1, I do not know how Flink works internally, it seems that in the sql , table article ,table praise, table response is growing as the time goes by, will it introduce performance issue? Best, Henry
|
Hi Hery, As for choise1:
As for choice2:
To solve your problem, I think we can do non-window group by first and then join three result tables. Furthermore, state retention time can be set to keep state from growing larger. Best, Hequn On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <[hidden email]> wrote:
|
Hi Hequn,
Thanks a lot four your response! This helps me understand the mechanism more clearly. I have another question: How do I use flink to accoplish time attenuation? If a use join plus retention time solution, I can only get the increment data. But some other data may need to be recomputed because the time attenuation. Then how do I flush them? Best, Henry,
|
Hi Henry, You can increase the retention time to make sure all data you want won't be expired. As for incremental, I think we can sink results into a kv storage, say hbase. The hbase table contains a total and latest data set you want so that you don't need to flush again. Would it be satisfy your scenario? Best, Hequn On Wed, Aug 22, 2018 at 2:51 PM 徐涛 <[hidden email]> wrote:
|
In reply to this post by Hequn Cheng
Hi Hequn,
You can't use window or other bounded operators after non-window join. The time attribute fields can not be passed through because of semantic conflict. Why does Flink have this limitation? I have a temp view var finalTable = tableEnv.sqlQuery(s"select * from A join B on xxxx join C on xxxx " ) tableEnv.registerTable("finalTable", finalTable) And I want to window this table because I want it to output 1 minute per second, however obviously I can not do this now, may I ask how can I make a “final table” to output 1 minute per second? And if a table is a retract stream, will the item added to the window be retracted either? Thanks a lot. Best Henry
|
Hi, Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied. Have you tried to use a window join? These preserve the timestamp order. Fabian 徐涛 <[hidden email]> schrieb am Di., 28. Aug. 2018, 11:42:
|
Hi Fabian,
I am working on a application that compute the “score" of an article by the number of praises, and reduce the score by the time, I am balancing on two choices: 1. Use global window join the article and article praise, with 3 days state retention, but I can not get the current time ,time is fixed when the program is started, so I can not compute the reduced score. I have to sink the data, then write some crontab jobs to update the score. 2. Use sliding window join, window length is 3 days , and sliding by one minute, this time I can get the window end time, but there so much data duplicated in windows, there are performance issues.
Each choices is not good enough, I am wondering if there are some other solves. Thanks a lot. Best Henry
|
Hi Henry, Fabian is right. You can try to use window join if your want a bounded join. According to your descriptions. I think what you want is(correct me if I'm wrong) : - Only join data within 3 days - Score should be calculated in bounded way - Retract previous score which exceed 3 days So, I think window join + bounded over may solve your problem. Do window join by `article.time between praise.time - 3days and praise.time + 3days`. You don't have to add sliding window before window join. After window join, you can perform a bounded over with 3 days interval to get the scores. There are documents about window join[1] and over[2]. On Tue, Aug 28, 2018 at 9:59 PM 徐涛 <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |