a question on window trigger and delta output

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

a question on window trigger and delta output

kant kodali
Hi All,

I set a transformation like this and my events in the stream have a sequential timestamp like 1,2,3,.... and I set the watermark to event time.

   myStream
       .keyBy(0)
       .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
       .aggregate(new myAggregateFunction())
       .print()

This stream prints only when there are events that are multiples of 1000. This is not quite what I want because I want to see some output every second. so I tried this 

myStream
.map(new PartitionMapper<>()).returns(typeInfo)
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))

This looked much better however it prints the entire state every second, which makes sense because thats what ContinuousProcessingTimeTrigger is supposed to do. But what I am looking for is combination of both like Trigger only if there are some events in the window with a max timeout of 1 second. if there are no events in the window then don't trigger because I don't want to see the same output every second. 

What would be the right trigger here (something that doesn't wait until the watermark event or doesn't print the same output every second)?

Also. I don't want to output the whole state in the transformation above instead I only want to output the delta from previous trigger to current trigger. I looked into DeltaTriggers and Delta evictors and confused by what the threshold parameter will do and also looking for some explanation on what they would do?

Other words, I want to output something similar to retractStream() when an outer join between two tables is executed since it outputs only delta and only when there are some events (doesn't wait until the watermark event or doesn't print the same output every second)

Thanks!