How to make onTimer() trigger on a CoProcessFunction after a failure?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to make onTimer() trigger on a CoProcessFunction after a failure?

Felipe Gutierrez
Hi community,

I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. 

But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how?

thanks,
Felipe
Reply | Threaded
Open this post in threaded view
|

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

Piotr Nowojski-4
Hi,

As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case?

If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery.

Best,
Piotrek

czw., 17 cze 2021 o 13:46 Felipe Gutierrez <[hidden email]> napisał(a):
Hi community,

I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. 

But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how?

thanks,
Felipe
Reply | Threaded
Open this post in threaded view
|

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

Felipe Gutierrez
Hello Piotrek,

On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case?
 
I think you are correct. at least when I reproduce the bug it is like you said.


If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery.

Could you please point how I can checkpoint the watermarks on a source operator? Is it done by this code below from here (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector)?

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

Thanks,
Felipe
 

Best,
Piotrek

czw., 17 cze 2021 o 13:46 Felipe Gutierrez <[hidden email]> napisał(a):
Hi community,

I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. 

But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how?

thanks,
Felipe
Reply | Threaded
Open this post in threaded view
|

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

Piotr Nowojski-4
Hi,

Keep in mind that this is a quite low level approach to this problem. It would be much better to make sure that after recovery watermarks are still being emitted. 

If you are using a built-in source, it's probably easier to do it in a custom operator. I would try to implement a custom one based on  AbstractStreamOperator. Your class would also need to implement the OneInputStreamOperator interface. `processElement` you could implement as an identity function (just pass down the stream element unchanged). In `processWatermark` you would need to store the latest watermark on the `ListState<Long>` field (you can declare it inside `AbstractStreamOperator#initializeState` via `context.getListState(new ListStateDescriptor<>("your-field-name", Long.class));`). During normal processing (`processWatermark`) make sure it's a singleton list. During recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, you would just access this state field and re-emit the only element on that list. However during recovery, depending if you are scaling up (a) or down (b), you could have a case where you sometimes have either (a) empty list (in that case you can not emit anything), or (b) many elements on the list (in that case you would need to calculate a minimum of all elements).

As operator API is not a very oficial one, it's not well documented. For an example you would need to take a look in the Flink code itself by finding existing implementations of the `AbstractStreamOperator` or `OneInputStreamOperator`.

Best,
Piotrek

pt., 18 cze 2021 o 12:49 Felipe Gutierrez <[hidden email]> napisał(a):
Hello Piotrek,

On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case?
 
I think you are correct. at least when I reproduce the bug it is like you said.


If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery.

Could you please point how I can checkpoint the watermarks on a source operator? Is it done by this code below from here (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector)?

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

Thanks,
Felipe
 

Best,
Piotrek

czw., 17 cze 2021 o 13:46 Felipe Gutierrez <[hidden email]> napisał(a):
Hi community,

I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. 

But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how?

thanks,
Felipe
Reply | Threaded
Open this post in threaded view
|

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

Felipe Gutierrez
On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Keep in mind that this is a quite low level approach to this problem. It would be much better to make sure that after recovery watermarks are still being emitted. 

yes. Indeed it looks like a very low level. I did a small test to emit one watermark for the stream that was recovered and then it can process the join. It has the same behavior on using a CoGroupFunction nad a CoProcessFunction. So in the end I don't need to implement MyCoProcessFunction with checkpoint. I just need to emit a new watermark after the job recovers.

In my case, I am using Kafka source. so, if I make Kafka keeping emitting watermarks I solve the problem. Otherwise, I have to implement this custom operator.

Thanks for your answer!
Felipe
 

If you are using a built-in source, it's probably easier to do it in a custom operator. I would try to implement a custom one based on  AbstractStreamOperator. Your class would also need to implement the OneInputStreamOperator interface. `processElement` you could implement as an identity function (just pass down the stream element unchanged). In `processWatermark` you would need to store the latest watermark on the `ListState<Long>` field (you can declare it inside `AbstractStreamOperator#initializeState` via `context.getListState(new ListStateDescriptor<>("your-field-name", Long.class));`). During normal processing (`processWatermark`) make sure it's a singleton list. During recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, you would just access this state field and re-emit the only element on that list. However during recovery, depending if you are scaling up (a) or down (b), you could have a case where you sometimes have either (a) empty list (in that case you can not emit anything), or (b) many elements on the list (in that case you would need to calculate a minimum of all elements).

As operator API is not a very oficial one, it's not well documented. For an example you would need to take a look in the Flink code itself by finding existing implementations of the `AbstractStreamOperator` or `OneInputStreamOperator`.

Best,
Piotrek

pt., 18 cze 2021 o 12:49 Felipe Gutierrez <[hidden email]> napisał(a):
Hello Piotrek,

On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case?
 
I think you are correct. at least when I reproduce the bug it is like you said.


If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery.

Could you please point how I can checkpoint the watermarks on a source operator? Is it done by this code below from here (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector)?

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

Thanks,
Felipe
 

Best,
Piotrek

czw., 17 cze 2021 o 13:46 Felipe Gutierrez <[hidden email]> napisał(a):
Hi community,

I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. 

But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how?

thanks,
Felipe
Reply | Threaded
Open this post in threaded view
|

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

Piotr Nowojski-4
I'm glad I could help, I hope it will solve your problem :)

Best,
Piotrek

pt., 18 cze 2021 o 14:38 Felipe Gutierrez <[hidden email]> napisał(a):
On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Keep in mind that this is a quite low level approach to this problem. It would be much better to make sure that after recovery watermarks are still being emitted. 

yes. Indeed it looks like a very low level. I did a small test to emit one watermark for the stream that was recovered and then it can process the join. It has the same behavior on using a CoGroupFunction nad a CoProcessFunction. So in the end I don't need to implement MyCoProcessFunction with checkpoint. I just need to emit a new watermark after the job recovers.

In my case, I am using Kafka source. so, if I make Kafka keeping emitting watermarks I solve the problem. Otherwise, I have to implement this custom operator.

Thanks for your answer!
Felipe
 

If you are using a built-in source, it's probably easier to do it in a custom operator. I would try to implement a custom one based on  AbstractStreamOperator. Your class would also need to implement the OneInputStreamOperator interface. `processElement` you could implement as an identity function (just pass down the stream element unchanged). In `processWatermark` you would need to store the latest watermark on the `ListState<Long>` field (you can declare it inside `AbstractStreamOperator#initializeState` via `context.getListState(new ListStateDescriptor<>("your-field-name", Long.class));`). During normal processing (`processWatermark`) make sure it's a singleton list. During recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, you would just access this state field and re-emit the only element on that list. However during recovery, depending if you are scaling up (a) or down (b), you could have a case where you sometimes have either (a) empty list (in that case you can not emit anything), or (b) many elements on the list (in that case you would need to calculate a minimum of all elements).

As operator API is not a very oficial one, it's not well documented. For an example you would need to take a look in the Flink code itself by finding existing implementations of the `AbstractStreamOperator` or `OneInputStreamOperator`.

Best,
Piotrek

pt., 18 cze 2021 o 12:49 Felipe Gutierrez <[hidden email]> napisał(a):
Hello Piotrek,

On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case?
 
I think you are correct. at least when I reproduce the bug it is like you said.


If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery.

Could you please point how I can checkpoint the watermarks on a source operator? Is it done by this code below from here (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector)?

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

Thanks,
Felipe
 

Best,
Piotrek

czw., 17 cze 2021 o 13:46 Felipe Gutierrez <[hidden email]> napisał(a):
Hi community,

I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. 

But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how?

thanks,
Felipe