How to use onTimer() on event stream for *ProcessFunction?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use onTimer() on event stream for *ProcessFunction?

Felipe Gutierrez
Hi community,

I don't understand why that KeyedProcessFunction.onTimer() is implemented here [1] is different from here [2]. Both are KeyedProcessFunction and they aim to fire a window on event time. At [1] the events are emitted at if (timestamp == result.lastModified + 60000) and the time is scheduled from the ctx.timestamp().

public void processElement()..... {
current.lastModified = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
public void onTimer().... {
        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();
        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }

At [2] there is no comparison of time on the onTimer() method. Plus the events are scheduled using a formula (eventTime - (eventTime % durationMsec) + durationMsec - 1) and only if they are not late of the watermark (eventTime <= timerService.currentWatermark()).

public void processElement()..... {
if (eventTime <= timerService.currentWatermark()) {
        // This event is late; its window has already been triggered.
    } else {
        // Round up eventTime to the end of the window containing this event.
        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
}
public void onTimer().... {
Float sumOfTips = this.sumOfTips.get(timestamp);


My use case uses a CoProcessFunction and I am saving the states on ListState. It works fine with the approach [1]. When I used the approach [2] some of the events are late because of the watermark.

What is the correct to be used? Or what is the best?

Afterwards I have to make this function fault tolerant. So, my next question is. Do I have to implement CheckpointedFunction and CheckpointedRestoring. Or because I am using ListState it already recovers from failures?

Thanks,
Felipe



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How to use onTimer() on event stream for *ProcessFunction?

Arvid Heise-4
Let's start in reverse: you don't need to implement CheckpointedFunction if you use managed state (ListState is managed).

Now to the question of how you should implement onTimer. That's up to you and heavily depends on your use case.
The first onTimer implementation is called 60s after an element of key X has been processed. If you have additional elements with key X, you get additional timers. However, in this example, only the latest timer should actually output data (think of some session detection). That's why the implementation checks if it is indeed the last timer or not before outputting elements.

The other implementation always outputs elements independent of additional timers/elements being added.

On Wed, Jun 16, 2021 at 4:08 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I don't understand why that KeyedProcessFunction.onTimer() is implemented here [1] is different from here [2]. Both are KeyedProcessFunction and they aim to fire a window on event time. At [1] the events are emitted at if (timestamp == result.lastModified + 60000) and the time is scheduled from the ctx.timestamp().

public void processElement()..... {
current.lastModified = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
public void onTimer().... {
        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();
        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }

At [2] there is no comparison of time on the onTimer() method. Plus the events are scheduled using a formula (eventTime - (eventTime % durationMsec) + durationMsec - 1) and only if they are not late of the watermark (eventTime <= timerService.currentWatermark()).

public void processElement()..... {
if (eventTime <= timerService.currentWatermark()) {
        // This event is late; its window has already been triggered.
    } else {
        // Round up eventTime to the end of the window containing this event.
        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
}
public void onTimer().... {
Float sumOfTips = this.sumOfTips.get(timestamp);


My use case uses a CoProcessFunction and I am saving the states on ListState. It works fine with the approach [1]. When I used the approach [2] some of the events are late because of the watermark.

What is the correct to be used? Or what is the best?

Afterwards I have to make this function fault tolerant. So, my next question is. Do I have to implement CheckpointedFunction and CheckpointedRestoring. Or because I am using ListState it already recovers from failures?

Thanks,
Felipe



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: How to use onTimer() on event stream for *ProcessFunction?

Felipe Gutierrez
I didn't know that I don't need to implement CheckpointedFunction if I use ListState. However, I considered this answer (https://stackoverflow.com/a/47443537/2096986) where Fabian says:

"You can store parts of the operator state also in the ListState (instead of holding it on the heap) but it will be quite expensive to access individual elements because you have to traverse an iterator."

So, maybe implementing the CheckpointedFunction and save on snapshotState() is still better? Maybe not? What do you think?

Would you please consider shedding a light on this question related to CoProcessFunction and event time trigger after a job failure? (https://lists.apache.org/x/thread.html/r5f74099a7b91b4ad47ac7612631f7e03d08c0e1d374487da55aa1a31@%3Cuser.flink.apache.org%3E)

Thank you very much!
Felipe



On Thu, Jun 17, 2021 at 4:38 PM Arvid Heise <[hidden email]> wrote:
Let's start in reverse: you don't need to implement CheckpointedFunction if you use managed state (ListState is managed).

Now to the question of how you should implement onTimer. That's up to you and heavily depends on your use case.
The first onTimer implementation is called 60s after an element of key X has been processed. If you have additional elements with key X, you get additional timers. However, in this example, only the latest timer should actually output data (think of some session detection). That's why the implementation checks if it is indeed the last timer or not before outputting elements.

The other implementation always outputs elements independent of additional timers/elements being added.

On Wed, Jun 16, 2021 at 4:08 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I don't understand why that KeyedProcessFunction.onTimer() is implemented here [1] is different from here [2]. Both are KeyedProcessFunction and they aim to fire a window on event time. At [1] the events are emitted at if (timestamp == result.lastModified + 60000) and the time is scheduled from the ctx.timestamp().

public void processElement()..... {
current.lastModified = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
public void onTimer().... {
        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();
        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }

At [2] there is no comparison of time on the onTimer() method. Plus the events are scheduled using a formula (eventTime - (eventTime % durationMsec) + durationMsec - 1) and only if they are not late of the watermark (eventTime <= timerService.currentWatermark()).

public void processElement()..... {
if (eventTime <= timerService.currentWatermark()) {
        // This event is late; its window has already been triggered.
    } else {
        // Round up eventTime to the end of the window containing this event.
        long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);
}
public void onTimer().... {
Float sumOfTips = this.sumOfTips.get(timestamp);


My use case uses a CoProcessFunction and I am saving the states on ListState. It works fine with the approach [1]. When I used the approach [2] some of the events are late because of the watermark.

What is the correct to be used? Or what is the best?

Afterwards I have to make this function fault tolerant. So, my next question is. Do I have to implement CheckpointedFunction and CheckpointedRestoring. Or because I am using ListState it already recovers from failures?

Thanks,
Felipe



--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez