Hi All,
I have a job where my source is kafka. Stream1 is partition the data on dynamic key, join the data with static rules(source kafka).I use KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka). All works fine in a normal run. For changing the watermark generation interval I stop the job taking a savepoint. When I restart the job with the savepoint the watermark is stuck at - -9223372036854775808. Because of this the process function doesn't emit any results. What could be the problem? Thanks, Hemant |
Hi Hemant, State of the latest seen watermarks is not persisted in the operators. Currently DataStream API assumes that after recovery watermarks are going to be re-emitted sooner or later. What probably happens is that one of your sources has emitted watermarks (maybe some very high one or even `MAX_WATERMARK`) before taking a savepoint, and then it stopped emitting them. As long as the job is not restarted, this watermark is kept in memory. However after recovery, all watermarks in the operators are set to MIN_WATERMARK (-9223372036854775808), and in your case, probably one of the inputs `KeyedCoProcessFunction` watermark is never updated after the recovery (for multiple input operators/functions combined watermark is min from all of the inputs). You would need to make sure in one way or another that the watermarks are being emitted after the recovery. As a last resort, you could probably implement an operator that remembers the last checkpointed watermark on its state, and re-emits it upon recovery. Best, Piotrek czw., 4 mar 2021 o 15:43 bat man <[hidden email]> napisał(a):
|
Thanks Piotr. Got it. Had to push the static rules to the kafka queue as it had expired and got archived from the topic. Post this the pipeline resumed. To your suggestion on implementing an operator that remembers the watermark, is there any indicator that the job has been resumed which I can use to emit the watermark in case the job has been resumed from savepoint. On Thu, Mar 4, 2021 at 8:46 PM Piotr Nowojski <[hidden email]> wrote:
|
Hi, I think you could try implementing the `CheckpointedFunction` interface and `FunctionInitializationContext.isRestored` is an indicator for that. BTW: I am not very sure your scenarios but maybe you could try to set idleness configurations [1] On Fri, Mar 5, 2021 at 2:19 AM bat man <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |