Hello,
I'm writing a test for my custom sink function. The function is stateful and relies on checkpoint restores for maintaining consistency with the external system that it's writing to. For integration testing of the sink function, I have a MiniCluster based environment inside a single JVM through which I create my job and validate its operation. In order to test the checkpoint restore behavior with precision, I've disabled checkpointing and am instead using savepoints. So, my test proceeds as follows: 1. Start a job. 2. Push some data through it to the sink and to an external system. 3. Trigger a savepoint. 4. Push more data. 5. Cancel the job. 6. Restore from the savepoint captured in step 3 above. I can't seem to find a Java API for restoring a job from a savepoint. The approach in the documentation and other resources is to use the CLI, which is not an option for me. Currently, I create a RemoteStreamEnvironment with savepointRestoreSettings set, but when I run execute(), I get the following error: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. var savepointDir = Separately, is there a flink testing utility I could use for integration testing of state checkpointing and recovery? Thanks, Abhishek |
You can study LocalStreamingFileSinkTest [1] for an example of how to approach this. You can use the test harnesses [2], keeping in mind that - initializeState is called during instance creation - the provided context indicates if state is being restored from a snapshot - snapshot is called when taking a checkpoint - notifyOfCompletedCheckpoint is called when a checkpoint is complete The outline of such a test might follow this pattern: testHarness1.setup(); testHarness1.initializeState(initState); testHarness1.open(); // setup state to checkpoint ... // capture snapshot snapshot = testHarness.snapshot(checkpointId, timestamp); // process more data, the effects of which will be lost ... // create a new test harness initialized with the state from the snapshot testHarness2.setup(); testHarness2.initializeState(snapshot); testHarness2.open(); // verify the state ... David On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai <[hidden email]> wrote:
|
Hi Abhishek, I did the same like you and tested my job with a parquet StreamingFileSink via a snaphot. (And run afterwards a small spark job on the parquet asserting that my flink output is correct) Good news for you is that it is easily possible to stop the job with a savepoint. You are already on the right tack. After you build your MiniCluster like so: private MiniClusterWithClientResource buildTestMiniCluster(Configuration flinkClusterConfig) throws Exception { you can receive a detached cluster client from it: private ClusterClient<?> getDetachedClusterClient(MiniClusterWithClientResource flinkTestCluster) { Now, in your test, you build the jobgraph, and submit the job: JobGraph jobGraph = env.getStreamGraph().getJobGraph(); Afterwards, you can easily stop the job with a savepoint via that client: flinkClient.stopWithSavepoint(submissionResult.getJobID(), false, savepointDir.toURI().toString()); In my case, I store the checkpoint in an ignored JUnit TemporaryFolder, because I only care about the written parquetfile, not the savepoint itself. That's all nice. The not-so-nice-part is that you don't easily know when the job actually processed all elements from your job and you can trigger stopping the pipeline with the savepoint. For this purpose, I use in each of my integration-tests-with-savepoint a public static Semaphore. As all my jobs read from a kafka source (using com.salesforce.kafka.test.junit4.SharedKafkaTestResource), I have a custom KafkaDeserializationSchema extending the default one from my job and implement isEndOfStream: @Override Finally, I finish writing the testdata in my test setup with a marker message "EOF". When that is received, I release the semaphore and the unittest thread executes the flinkClient.stopWithSavepoint line. This works because the minicluster runs in the same JVM and as the savepoint/checkpoint marker will flow through the pipeline and can't overtake my prior messages anymore, so all data will be written and I can run my assertions after the "stopWithSavepoint" line of code as this runs synchronously. Hope that helps. Best regards Theo Von: "David Anderson" <[hidden email]> An: "Abhishek Rai" <[hidden email]> CC: "user" <[hidden email]> Gesendet: Freitag, 12. Juni 2020 20:21:07 Betreff: Re: Restore from savepoint through Java API You can study LocalStreamingFileSinkTest [1] for an example of how to approach this. You can use the test harnesses [2], keeping in mind that - initializeState is called during instance creation - the provided context indicates if state is being restored from a snapshot - snapshot is called when taking a checkpoint - notifyOfCompletedCheckpoint is called when a checkpoint is complete The outline of such a test might follow this pattern: testHarness1.setup(); testHarness1.initializeState(initState); testHarness1.open(); // setup state to checkpoint ... // capture snapshot snapshot = testHarness.snapshot(checkpointId, timestamp); // process more data, the effects of which will be lost ... // create a new test harness initialized with the state from the snapshot testHarness2.setup(); testHarness2.initializeState(snapshot); testHarness2.open(); // verify the state ... David On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |