Hi.
I have a simple tumble window working on eventtime like this. Table res30MinWindows = machineInsights As we work with Iot units we don't have 100% control over the eventtime reported and therefore need to handle late data to ensure that we don't do our calculation wrong. I would like to know if there is any option in the Table API to get access to late data, or my only option is to use Streaming API? Thanks in advance Lasse Nedergaard |
Hi
@Lasse Nedergaard, Table API don't have allowedLateness api. But you can set rowtime.watermarks.delay of source to slow down the watermark clock.
|
To set rowtime watermarks delay of source you can: val desc = Schema() Use watermarksPeriodicBounded api to set delay. And then XXTableFactory.createStreamTableSource(desc.toProperties)
|
In reply to this post by JingsongLee
Hi
Thanks for the fast reply. Unfortunately it not an option as some devices can deliver data days back in time and I would like to have the results as fast as possible. I have to convert my implementation to use streaming API instead.
Med venlig hilsen / Best regards Lasse Nedergaard
|
Hi Lasse, > some devices can deliver data days back in time and I would like to have the results as fast as possible. What JingsongLee said is correct. However, it's possible to handle your problem with Table API according to your description above. You can use the non-window(or unbounded) aggregate[1]. The non-window aggregate supports early fire, i.e., output results immediately once there is an update, so you can "have the results as fast as possible". The query looks like: Table res30MinWindows = machineInsights .select("UserActionTime / (30 * 60) as windowId, machineId, machineInsightId, value") .groupBy("windowId, machineId, machineInsightId") .select("machineId, machineInsightId, windowId as wStart, windowId + 1800 as sEnd, value.max as max") Only you have to notice is, as non-window aggregate keeps all (result)data in its state, the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. To solve this problem, you can provide a query configuration with a valid retention interval to prevent excessive state size[2]. In your case, I think the valid retention interval would be the max delay interval of your data. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/tableApi.html#aggregations On Tue, Apr 16, 2019 at 5:38 PM Lasse Nedergaard <[hidden email]> wrote:
|
Hi Hequn
Thanks for the details. I will give it a try.
Med venlig hilsen / Best regards Lasse Nedergaard
|
Free forum by Nabble | Edit this page |