Hi community,
I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine. After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash. I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how? thanks, Felipe |
Hi, As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case? If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery. Best, Piotrek czw., 17 cze 2021 o 13:46 Felipe Gutierrez <[hidden email]> napisał(a):
|
Hello Piotrek, On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <[hidden email]> wrote:
I think you are correct. at least when I reproduce the bug it is like you said.
Could you please point how I can checkpoint the watermarks on a source operator? Is it done by this code below from here (https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector)? FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props); kafkaSource.assignTimestampsAndWatermarks( WatermarkStrategy. .forBoundedOutOfOrderness(Duration.ofSeconds(20))); Thanks, Felipe
|
Hi, Keep in mind that this is a quite low level approach to this problem. It would be much better to make sure that after recovery watermarks are still being emitted. If you are using a built-in source, it's probably easier to do it in a custom operator. I would try to implement a custom one based on AbstractStreamOperator. Your class would also need to implement the OneInputStreamOperator interface. `processElement` you could implement as an identity function (just pass down the stream element unchanged). In `processWatermark` you would need to store the latest watermark on the `ListState<Long>` field (you can declare it inside `AbstractStreamOperator#initializeState` via `context.getListState(new ListStateDescriptor<>("your-field-name", Long.class));`). During normal processing (`processWatermark`) make sure it's a singleton list. During recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, you would just access this state field and re-emit the only element on that list. However during recovery, depending if you are scaling up (a) or down (b), you could have a case where you sometimes have either (a) empty list (in that case you can not emit anything), or (b) many elements on the list (in that case you would need to calculate a minimum of all elements). As operator API is not a very oficial one, it's not well documented. For an example you would need to take a look in the Flink code itself by finding existing implementations of the `AbstractStreamOperator` or `OneInputStreamOperator`. Best, Piotrek pt., 18 cze 2021 o 12:49 Felipe Gutierrez <[hidden email]> napisał(a):
|
On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <[hidden email]> wrote:
yes. Indeed it looks like a very low level. I did a small test to emit one watermark for the stream that was recovered and then it can process the join. It has the same behavior on using a CoGroupFunction nad a CoProcessFunction. So in the end I don't need to implement MyCoProcessFunction with checkpoint. I just need to emit a new watermark after the job recovers. In my case, I am using Kafka source. so, if I make Kafka keeping emitting watermarks I solve the problem. Otherwise, I have to implement this custom operator. Thanks for your answer! Felipe
|
I'm glad I could help, I hope it will solve your problem :) Best, Piotrek pt., 18 cze 2021 o 14:38 Felipe Gutierrez <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |