How does flink know which data is modified in dynamic table?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How does flink know which data is modified in dynamic table?

徐涛
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry

Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

Hequn Cheng
Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry


Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

徐涛
Hi Hequn,
However is it semantically correct? because the sql result is not equal to the bounded table.


在 2018年8月20日,下午8:34,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry



Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

Hequn Cheng
Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
However is it semantically correct? because the sql result is not equal to the bounded table.


在 2018年8月20日,下午8:34,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry




Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

徐涛
Hi Hequn,
Maybe I do not express clearly. I mean if only the update_timestamp of the increment data is updated, it is not enough. Because from the sql, it express the idea “all the time in the table is the same”, but actually each item in the table may be different. It is a bit weird.

Best, Henry



在 2018年8月21日,上午10:09,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
However is it semantically correct? because the sql result is not equal to the bounded table.


在 2018年8月20日,下午8:34,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry





Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

Hequn Cheng
Hi Henry,

You are right that, in MySQL, SYSDATE returns the time at which it executes while LOCALTIMESTAMP returns a constant time that indicates the time at which the statement began to execute.
But other database system seems don't have this constraint(correct me if I'm wrong). Sometimes we don't have to follow MySQL.

Best, Hequn

On Tue, Aug 21, 2018 at 10:21 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
Maybe I do not express clearly. I mean if only the update_timestamp of the increment data is updated, it is not enough. Because from the sql, it express the idea “all the time in the table is the same”, but actually each item in the table may be different. It is a bit weird.

Best, Henry



在 2018年8月21日,上午10:09,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
However is it semantically correct? because the sql result is not equal to the bounded table.


在 2018年8月20日,下午8:34,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry






Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

徐涛
Hi Hequn,
Another question, for some case, I think update the timestamp of the retract row is reasonable, for example, some user does not want to the hard delete, but the soft delete, so I write code when the retract row comes I only do the soft delete, but I want the update_timestamp different so the ETL program can know that this line has changed.


    For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)

在 2018年8月21日,下午12:25,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

You are right that, in MySQL, SYSDATE returns the time at which it executes while LOCALTIMESTAMP returns a constant time that indicates the time at which the statement began to execute.
But other database system seems don't have this constraint(correct me if I'm wrong). Sometimes we don't have to follow MySQL.

Best, Hequn

On Tue, Aug 21, 2018 at 10:21 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
Maybe I do not express clearly. I mean if only the update_timestamp of the increment data is updated, it is not enough. Because from the sql, it express the idea “all the time in the table is the same”, but actually each item in the table may be different. It is a bit weird.

Best, Henry



在 2018年8月21日,上午10:09,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
However is it semantically correct? because the sql result is not equal to the bounded table.


在 2018年8月20日,下午8:34,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry







Reply | Threaded
Open this post in threaded view
|

Re: How does flink know which data is modified in dynamic table?

Hequn Cheng
Hi,

You are right. We can make use of it to do soft delete. 
But there will be problems in other cases. For example, retract messages by the whole row. I opened a jira[1] about this problem. Thanks for bring up this discussion.


Best, Hequn

On Tue, Aug 21, 2018 at 12:34 PM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
Another question, for some case, I think update the timestamp of the retract row is reasonable, for example, some user does not want to the hard delete, but the soft delete, so I write code when the retract row comes I only do the soft delete, but I want the update_timestamp different so the ETL program can know that this line has changed.


    For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)

在 2018年8月21日,下午12:25,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

You are right that, in MySQL, SYSDATE returns the time at which it executes while LOCALTIMESTAMP returns a constant time that indicates the time at which the statement began to execute.
But other database system seems don't have this constraint(correct me if I'm wrong). Sometimes we don't have to follow MySQL.

Best, Hequn

On Tue, Aug 21, 2018 at 10:21 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
Maybe I do not express clearly. I mean if only the update_timestamp of the increment data is updated, it is not enough. Because from the sql, it express the idea “all the time in the table is the same”, but actually each item in the table may be different. It is a bit weird.

Best, Henry



在 2018年8月21日,上午10:09,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <[hidden email]> wrote:
Hi Hequn,
However is it semantically correct? because the sql result is not equal to the bounded table.


在 2018年8月20日,下午8:34,Hequn Cheng <[hidden email]> 写道:

Hi Henry,

Both sql output incrementally. 

However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different.  
For example, if the value is updated from 1 to 2, 
previous row:  add (a, 1, 2018-08-20 20:18:10.286)
retract row: delete (a, 1, 2018-08-20 20:18:10.386)
new row: add (a, 2, 2018-08-20 20:18:10.486)
The retract row is different from the previous row because of the time field.

Of course, this problem should be fixed later.

Best, Hequn

On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[hidden email]> wrote:
Hi All,
Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 10000 items now, when one item comes to the stream, only very small amount of data is write to sink) 
	var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM praise group by article_id )
        tableEnv.registerTable("finalTable", praiseAggr)
	tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from finalTable")

        But if I use the following sql, by adding a dynamic timestamp field:
var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id )
      Is the whole table flush to the sink? Or only the incremental value will flush to the sink? Why?

Thanks,
Henry