How to emit changed data only w/ Flink trigger?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to emit changed data only w/ Flink trigger?

Qi Kang
Hi all,


We have a Flink job which aggregates sales volume and GMV data of each site on a daily basis. The code skeleton is shown as follows.


```
sourceStream
 .map(message -> JSON.parseObject(message, OrderDetail.class))
 .keyby("siteId")
 .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
 .aggregate(new VolumeGmvAggregateFunc());
```


The window is triggered every second in order to refresh the data displayed on a real-time dashboard. Is there some way to output only those sites’ data which changed in 1 second period? Currently we’ve got 1000+ sites, so frequently emitting all aggregation records seems somewhat expensive.


BR, Qi Kang


Reply | Threaded
Open this post in threaded view
|

Re: How to emit changed data only w/ Flink trigger?

taher koitawala-2
You can do this by writing a custom trigger or evictor.

On Fri, Nov 1, 2019 at 3:08 PM Qi Kang <[hidden email]> wrote:
Hi all,


We have a Flink job which aggregates sales volume and GMV data of each site on a daily basis. The code skeleton is shown as follows.


```
sourceStream
 .map(message -> JSON.parseObject(message, OrderDetail.class))
 .keyby("siteId")
 .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
 .aggregate(new VolumeGmvAggregateFunc());
```


The window is triggered every second in order to refresh the data displayed on a real-time dashboard. Is there some way to output only those sites’ data which changed in 1 second period? Currently we’ve got 1000+ sites, so frequently emitting all aggregation records seems somewhat expensive.


BR, Qi Kang


Reply | Threaded
Open this post in threaded view
|

Re: How to emit changed data only w/ Flink trigger?

kant kodali
In reply to this post by Qi Kang
I am new to Flink so I am not sure if I am giving you the correct answer so you might want to wait for others to respond. But I think you should do 

.inUpsertMode()

On Fri, Nov 1, 2019 at 2:38 AM Qi Kang <[hidden email]> wrote:
Hi all,


We have a Flink job which aggregates sales volume and GMV data of each site on a daily basis. The code skeleton is shown as follows.


```
sourceStream
 .map(message -> JSON.parseObject(message, OrderDetail.class))
 .keyby("siteId")
 .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
 .aggregate(new VolumeGmvAggregateFunc());
```


The window is triggered every second in order to refresh the data displayed on a real-time dashboard. Is there some way to output only those sites’ data which changed in 1 second period? Currently we’ve got 1000+ sites, so frequently emitting all aggregation records seems somewhat expensive.


BR, Qi Kang