Hello members,
I am new to the Apache Flink word and in the last month, I have been exploring the testing scenarios offered by Flink team and different books to learn Flink. Today I was trying to better understand this test that you can find it here: <http://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java> I will try to explain how I understand it and maybe you can point out the problems of my logic. Testing parameters: delay = 1L; maxAttempts = 3; stateBackend = FsStateBackend RandomLongSource: Firstly we will create data source implementing the CheckpointedFunction interface. If the number of attempts is higher than the number of max allowed attempts, we will emit the last event and shut down the source, otherwise, we will continue emitting events. 1.1/ Why we need the maxAttempts in this scenario? Is that the number of times we allow the application to fail?/ initializeState method is called every time the user-defined function is initialized, or be that when the function is actually recovering from an earlier checkpoint. [1] StateCreatingFlatMap: After implementing the source, with the flat map operator, we are going to generate failure scenarios and test how flink will handle situations. We are going to kill TaskManagers using halt method if the PID corresponds with the PID we decided to kill. In the initialState method, we will handle how the recovery will be done and if the state was previously restored we will capture the info regarding it. This is my understanding of the testing source code, but I have not clear how it will really work and if I am capturing the real scenario demonstration correctly. I decided to test it using 1 JobManager and 3 TaskManagers (even the max operator parallelism is 1). The application will start running and constantly will be checkpointed. In some moments the task will be killed and the application will be restored to the last saved checkpoint. If the application has 4 failures (more than allowed attempts 3), than we will successfully finish the application. Is that correct? 2.1 Is this how the logic of the scenario works? 2.2 Is this an example of fault tolerance using checkpoints? I will upload the screenshots of UI dashboard and an exception that I don't really understand, but in some forums, it read that it was a problem with job manager heap size. I ask sorry if my question is not well-formatted or if it sounds stupid. Best regards <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2744/fail-1.png> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2744/fail-2.png> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2744/fail-3.png> [1] <http://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-operator-state> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
2.1) You overall seem to be on the right track.
- 1.1) maxAttempts is used by the source to initiate a clean shutdown; if the source(s) of a streaming job shut down the rest of pipeline does too once there is no more data to process. - the exception you see is expected; we are killing a task executor, it no longer responds to heartbeat requests by the job manager heartbeats, so they time out. - 2.2) the test does rely on data being checkpointed and accessible after a job failure&restart, so yes. On 9/23/2020 4:05 PM, cokle wrote: > Hello members, > I am new to the Apache Flink word and in the last month, I have been > exploring the testing scenarios offered by Flink team and different books to > learn Flink. > > Today I was trying to better understand this test that you can find it here: > > > <http://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java> > > I will try to explain how I understand it and maybe you can point out the > problems of my logic. > > Testing parameters: > delay = 1L; > maxAttempts = 3; > stateBackend = FsStateBackend > > > RandomLongSource: > Firstly we will create data source implementing the CheckpointedFunction > interface. If the number of attempts is higher than the number of max > allowed attempts, we will emit the last event and shut down the source, > otherwise, we will continue emitting events. > 1.1/ Why we need the maxAttempts in this scenario? Is that the number of > times we allow the application to fail?/ > initializeState method is called every time the user-defined function is > initialized, or be that when the function is actually recovering from an > earlier checkpoint. [1] > > > > StateCreatingFlatMap: > After implementing the source, with the flat map operator, we are going to > generate failure scenarios and test how flink will handle situations. We are > going to kill TaskManagers using halt method if the PID corresponds with the > PID we decided to kill. > In the initialState method, we will handle how the recovery will be done and > if the state was previously restored we will capture the info regarding it. > > > This is my understanding of the testing source code, but I have not clear > how it will really work and if I am capturing the real scenario > demonstration correctly. > I decided to test it using 1 JobManager and 3 TaskManagers (even the max > operator parallelism is 1). > > The application will start running and constantly will be checkpointed. In > some moments the task will be killed and the application will be restored to > the last saved checkpoint. If the application has 4 failures (more than > allowed attempts 3), than we will successfully finish the application. Is > that correct? > > 2.1 Is this how the logic of the scenario works? > 2.2 Is this an example of fault tolerance using checkpoints? > > I will upload the screenshots of UI dashboard and an exception that I don't > really understand, but in some forums, it read that it was a problem with > job manager heap size. > > > I ask sorry if my question is not well-formatted or if it sounds stupid. > Best regards > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2744/fail-1.png> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2744/fail-2.png> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2744/fail-3.png> > > > [1] > <http://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-operator-state> > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Free forum by Nabble | Edit this page |