Hi, I have a question about implementing method of SourceFunction. I'm trying to implement my own SourceFunction using Flink 1.11.3. The following javadoc says that SourceFunction which implements CheckpointedFunction should use synchronized block to perform checkpointing and emission of elements atomically. On the other hand, from the implementation of StreamSourceContexts and StreamTaskActionExecutor, it looks like that the SourceContext.collect and checkpointing processes are exclusive. I’m sorry if I misunderstood. My Question is that is the synchronized block in SourceFunction necessary? And why is it necessary?
Best regards, Kazunori Shinhira Mail : [hidden email] |
Hi Kazunori, The checkpoint lock is acquired preemptively inside the SourceContext#collect call for the cases if the source is state less. However this is not enough if you are implementing a stateful `SourceFunction`, since if you are modifying state in your source function outside of the checkpoint lock scope, those updates would be happening concurrently to the `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState` call and bad things would happen. `StreamOperator#snapshotState` is one example of actions that are being executed in the `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the execution context, if it's happening in a source task or not, will be or will not be acquiring the checkpoint lock. Also please note that `SourceFunction` is currently (as of Flink 1.12.0) being phased out, in favor of the new `org.apache.flink.api.connector.source.Source` interface. Amongst other improvements, this newer interface has a single thread execution model, without a dedicated thread to run the source code, so there is no need for the checkpoint lock. Best, Piotrek pon., 18 sty 2021 o 13:05 Kazunori Shinhira <[hidden email]> napisał(a):
|
Hi Piotrek, Thank you for your reply. I understood that synchronization with checkpoint lock is needed to make state modification and checkpointing exclusive. In my understanding, for example, in implementation of SourceFunction for Kafka, it is necessary to enclose the process of acquiring records and updating current offset, that is the state of kafka SourceFunction, within synchronized code. If we don't use synchronized block, `StreamOperator#snapshotState` will be called concurrently with state modification. As a result, Source may save wrong offset and lost record if job recreation occurs at that timing. Is my understanding correct ? Thank you for the information on the new Source interface. I’ll look into how to implement it. Best, 2021年1月18日(月) 23:45 Piotr Nowojski <[hidden email]>:
Kazunori Shinhira Mail : [hidden email] |
Hi, yes exactly :) > As a result, Source may save wrong offset and lost record if job recreation occurs at that timing. This is just one of the possible race conditions that could happen. As offsets are probably 64 bit integers, I'm pretty sure corrupted writes/reads can also happen, when only half of the bits (for example lower 32bits) were updated before checkpointing. Best, Piotrek wt., 19 sty 2021 o 02:22 Kazunori Shinhira <[hidden email]> napisał(a):
|
Hi, Thank you for your explanation. I now understand the need for checkpoint lock :) Best, 2021年1月19日(火) 18:00 Piotr Nowojski <[hidden email]>:
Kazunori Shinhira Mail : [hidden email] |
No problem :) Piotrek śr., 20 sty 2021 o 02:12 Kazunori Shinhira <[hidden email]> napisał(a):
|
Free forum by Nabble | Edit this page |