Is it possible to handle late data when using table API?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Is it possible to handle late data when using table API?

Lasse Nedergaard
Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong.
I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard


Reply | Threaded
Open this post in threaded view
|

回复:Is it possible to handle late data when using table API?

JingsongLee
Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark clock.

------------------------------------------------------------------
发件人:Lasse Nedergaard <[hidden email]>
发送时间:2019年4月16日(星期二) 16:20
收件人:user <[hidden email]>
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong.
I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard


Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to handle late data when using table API?

JingsongLee
To set rowtime watermarks delay of source you can:
val desc = Schema()
.field("a", Types.INT)
.field("e", Types.LONG)
.field("f", Types.STRING)
.field("t", Types.SQL_TIMESTAMP) .rowtime(Rowtime().timestampsFromField("t").watermarksPeriodicBounded(1000))
Use watermarksPeriodicBounded api to set delay.
And then XXTableFactory.createStreamTableSource(desc.toProperties)

------------------------------------------------------------------
From:JingsongLee <[hidden email]>
Send Time:2019年4月16日(星期二) 17:09
To:Lasse Nedergaard <[hidden email]>; user <[hidden email]>
Subject:回复:Is it possible to handle late data when using table API?

Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark clock.

------------------------------------------------------------------
发件人:Lasse Nedergaard <[hidden email]>
发送时间:2019年4月16日(星期二) 16:20
收件人:user <[hidden email]>
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong.
I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard


Reply | Threaded
Open this post in threaded view
|

Re: 回复:Is it possible to handle late data when using table API?

Lasse Nedergaard
In reply to this post by JingsongLee
Hi

Thanks for the fast reply. Unfortunately it not an option as some devices can deliver data days back in time and I would like to have the results as fast as possible. 
I have to convert my implementation to use streaming API instead. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 16. apr. 2019 kl. 11.08 skrev JingsongLee <[hidden email]>:

Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark clock.

------------------------------------------------------------------
发件人:Lasse Nedergaard <[hidden email]>
发送时间:2019年4月16日(星期二) 16:20
收件人:user <[hidden email]>
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong.
I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard


Reply | Threaded
Open this post in threaded view
|

Re: 回复:Is it possible to handle late data when using table API?

Hequn Cheng
Hi Lasse,

> some devices can deliver data days back in time and I would like to have the results as fast as possible.

What JingsongLee said is correct.

However, it's possible to handle your problem with Table API according to your description above. You can use the non-window(or unbounded) aggregate[1].
The non-window aggregate supports early fire, i.e., output results immediately once there is an update, so you can "have the results as fast as possible". The query looks like:

 Table res30MinWindows = machineInsights
.select("UserActionTime / (30 * 60) as windowId, machineId, machineInsightId, value")
.groupBy("windowId, machineId, machineInsightId")
.select("machineId, machineInsightId, windowId as wStart, windowId + 1800 as sEnd, value.max as max")

Only you have to notice is, as non-window aggregate keeps all (result)data in its state, the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. To solve this problem, you can provide a query configuration with a valid retention interval to prevent excessive state size[2]. 
In your case, I think the valid retention interval would be the max delay interval of your data. 

Best, Hequn



On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard <[hidden email]> wrote:
Hi

Thanks for the fast reply. Unfortunately it not an option as some devices can deliver data days back in time and I would like to have the results as fast as possible. 
I have to convert my implementation to use streaming API instead. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 16. apr. 2019 kl. 11.08 skrev JingsongLee <[hidden email]>:

Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark clock.

------------------------------------------------------------------
发件人:Lasse Nedergaard <[hidden email]>
发送时间:2019年4月16日(星期二) 16:20
收件人:user <[hidden email]>
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong.
I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard


Reply | Threaded
Open this post in threaded view
|

Re: 回复:Is it possible to handle late data when using table API?

Lasse Nedergaard
Hi Hequn

Thanks for the details. I will give it a try. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 17. apr. 2019 kl. 04.09 skrev Hequn Cheng <[hidden email]>:

Hi Lasse,

> some devices can deliver data days back in time and I would like to have the results as fast as possible.

What JingsongLee said is correct.

However, it's possible to handle your problem with Table API according to your description above. You can use the non-window(or unbounded) aggregate[1].
The non-window aggregate supports early fire, i.e., output results immediately once there is an update, so you can "have the results as fast as possible". The query looks like:

 Table res30MinWindows = machineInsights
.select("UserActionTime / (30 * 60) as windowId, machineId, machineInsightId, value")
.groupBy("windowId, machineId, machineInsightId")
.select("machineId, machineInsightId, windowId as wStart, windowId + 1800 as sEnd, value.max as max")

Only you have to notice is, as non-window aggregate keeps all (result)data in its state, the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. To solve this problem, you can provide a query configuration with a valid retention interval to prevent excessive state size[2]. 
In your case, I think the valid retention interval would be the max delay interval of your data. 

Best, Hequn



On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard <[hidden email]> wrote:
Hi

Thanks for the fast reply. Unfortunately it not an option as some devices can deliver data days back in time and I would like to have the results as fast as possible. 
I have to convert my implementation to use streaming API instead. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 16. apr. 2019 kl. 11.08 skrev JingsongLee <[hidden email]>:

Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark clock.

------------------------------------------------------------------
发件人:Lasse Nedergaard <[hidden email]>
发送时间:2019年4月16日(星期二) 16:20
收件人:user <[hidden email]>
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong.
I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard