Restore from savepoint through Java API

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Restore from savepoint through Java API

Abhishek Rai
Hello,

I'm writing a test for my custom sink function.  The function is stateful and relies on checkpoint restores for maintaining consistency with the external system that it's writing to.  For integration testing of the sink function, I have a MiniCluster based environment inside a single JVM through which I create my job and validate its operation.

In order to test the checkpoint restore behavior with precision, I've disabled checkpointing and am instead using savepoints.  So, my test proceeds as follows:

1. Start a job.
2. Push some data through it to the sink and to an external system.
3. Trigger a savepoint.
4. Push more data.
5. Cancel the job.
6. Restore from the savepoint captured in step 3 above.

I can't seem to find a Java API for restoring a job from a savepoint.  The approach in the documentation and other resources is to use the CLI, which is not an option for me.  Currently, I create a RemoteStreamEnvironment with savepointRestoreSettings set, but when I run execute(), I get the following error:

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

var savepointDir =
restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
assertTrue(!savepointDir.isBlank());
// Cancel the job and launch a new one from the save point.
restClusterClient_.cancel(jobId).get();
var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
var env = new RemoteStreamEnvironment(
flinkMiniCluster_.host(),
flinkMiniCluster_.port(),
null,
new String[] {},
null,
restoreSettings);
var restoredJob = env.executeAsync();

Separately, is there a flink testing utility I could use for integration testing of state checkpointing and recovery?

Thanks,
Abhishek
Reply | Threaded
Open this post in threaded view
|

Re: Restore from savepoint through Java API

David Anderson-3
You can study LocalStreamingFileSinkTest [1] for an example of how to approach this. You can use the test harnesses [2], keeping in mind that 

- initializeState is called during instance creation
- the provided context indicates if state is being restored from a snapshot
- snapshot is called when taking a checkpoint
- notifyOfCompletedCheckpoint is called when a checkpoint is complete

The outline of such a test might follow this pattern:

testHarness1.setup();
testHarness1.initializeState(initState);
testHarness1.open();

// setup state to checkpoint ...

// capture snapshot
snapshot = testHarness.snapshot(checkpointId, timestamp);

// process more data, the effects of which will be lost ...

// create a new test harness initialized with the state from the snapshot
testHarness2.setup();
testHarness2.initializeState(snapshot);
testHarness2.open();

// verify the state ...

David


On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai <[hidden email]> wrote:
Hello,

I'm writing a test for my custom sink function.  The function is stateful and relies on checkpoint restores for maintaining consistency with the external system that it's writing to.  For integration testing of the sink function, I have a MiniCluster based environment inside a single JVM through which I create my job and validate its operation.

In order to test the checkpoint restore behavior with precision, I've disabled checkpointing and am instead using savepoints.  So, my test proceeds as follows:

1. Start a job.
2. Push some data through it to the sink and to an external system.
3. Trigger a savepoint.
4. Push more data.
5. Cancel the job.
6. Restore from the savepoint captured in step 3 above.

I can't seem to find a Java API for restoring a job from a savepoint.  The approach in the documentation and other resources is to use the CLI, which is not an option for me.  Currently, I create a RemoteStreamEnvironment with savepointRestoreSettings set, but when I run execute(), I get the following error:

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

var savepointDir =
restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
assertTrue(!savepointDir.isBlank());
// Cancel the job and launch a new one from the save point.
restClusterClient_.cancel(jobId).get();
var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
var env = new RemoteStreamEnvironment(
flinkMiniCluster_.host(),
flinkMiniCluster_.port(),
null,
new String[] {},
null,
restoreSettings);
var restoredJob = env.executeAsync();

Separately, is there a flink testing utility I could use for integration testing of state checkpointing and recovery?

Thanks,
Abhishek
Reply | Threaded
Open this post in threaded view
|

Re: Restore from savepoint through Java API

Theo
Hi Abhishek,

I did the same like you and tested my job with a parquet StreamingFileSink via a snaphot. (And run afterwards a small spark job on the parquet asserting that my flink output is correct)

Good news for you is that it is easily possible to stop the job with a savepoint. You are already on the right tack.

After you build your MiniCluster like so:
private MiniClusterWithClientResource buildTestMiniCluster(Configuration flinkClusterConfig) throws Exception {
MiniClusterWithClientResource flinkTestCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(flinkClusterConfig)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());
flinkTestCluster.before();
return flinkTestCluster;
}
you can receive a detached cluster client from it:
private ClusterClient<?> getDetachedClusterClient(MiniClusterWithClientResource flinkTestCluster) {
ClusterClient<?> flinkClient = flinkTestCluster.getClusterClient();
flinkClient.setDetached(true);
return flinkClient;
}

Now, in your test, you build the jobgraph, and submit the job:
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobSubmissionResult submissionResult = flinkClient.submitJob(jobGraph, getClass().getClassLoader());

Afterwards, you can easily stop the job with a savepoint via that client:
flinkClient.stopWithSavepoint(submissionResult.getJobID(), false, savepointDir.toURI().toString()); 

In my case, I store the checkpoint in an ignored JUnit TemporaryFolder, because I only care about the written parquetfile, not the savepoint itself.

That's all nice. The not-so-nice-part is that you don't easily know when the job actually processed all elements from your job and you can trigger stopping the pipeline with the savepoint. For this purpose, I use in each of my integration-tests-with-savepoint a public static Semaphore. As all my jobs read from a kafka source (using com.salesforce.kafka.test.junit4.SharedKafkaTestResource), I have a custom KafkaDeserializationSchema extending the default one from my job and implement isEndOfStream:

@Override
public boolean isEndOfStream(V nextElement) {
if ("EOF".equals(nextElement.getId())) {
LOCK.release();
}
return false;
}
Finally, I finish writing the testdata in my test setup with a marker message "EOF". When that is received, I release the semaphore and the unittest thread executes the flinkClient.stopWithSavepoint line. This works because the minicluster runs in the same JVM and as the savepoint/checkpoint marker will flow through the pipeline and can't overtake my prior messages anymore, so all data will be written and I can run my assertions after the "stopWithSavepoint" line of code as this runs synchronously.

Hope that helps.

Best regards
Theo




Von: "David Anderson" <[hidden email]>
An: "Abhishek Rai" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Freitag, 12. Juni 2020 20:21:07
Betreff: Re: Restore from savepoint through Java API

You can study LocalStreamingFileSinkTest [1] for an example of how to approach this. You can use the test harnesses [2], keeping in mind that 
- initializeState is called during instance creation
- the provided context indicates if state is being restored from a snapshot
- snapshot is called when taking a checkpoint
- notifyOfCompletedCheckpoint is called when a checkpoint is complete

The outline of such a test might follow this pattern:

testHarness1.setup();
testHarness1.initializeState(initState);
testHarness1.open();

// setup state to checkpoint ...

// capture snapshot
snapshot = testHarness.snapshot(checkpointId, timestamp);

// process more data, the effects of which will be lost ...

// create a new test harness initialized with the state from the snapshot
testHarness2.setup();
testHarness2.initializeState(snapshot);
testHarness2.open();

// verify the state ...

David


On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai <[hidden email]> wrote:
Hello,
I'm writing a test for my custom sink function.  The function is stateful and relies on checkpoint restores for maintaining consistency with the external system that it's writing to.  For integration testing of the sink function, I have a MiniCluster based environment inside a single JVM through which I create my job and validate its operation.

In order to test the checkpoint restore behavior with precision, I've disabled checkpointing and am instead using savepoints.  So, my test proceeds as follows:

1. Start a job.
2. Push some data through it to the sink and to an external system.
3. Trigger a savepoint.
4. Push more data.
5. Cancel the job.
6. Restore from the savepoint captured in step 3 above.

I can't seem to find a Java API for restoring a job from a savepoint.  The approach in the documentation and other resources is to use the CLI, which is not an option for me.  Currently, I create a RemoteStreamEnvironment with savepointRestoreSettings set, but when I run execute(), I get the following error:

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

var savepointDir =
restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
assertTrue(!savepointDir.isBlank());
// Cancel the job and launch a new one from the save point.
restClusterClient_.cancel(jobId).get();
var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
var env = new RemoteStreamEnvironment(
flinkMiniCluster_.host(),
flinkMiniCluster_.port(),
null,
new String[] {},
null,
restoreSettings);
var restoredJob = env.executeAsync();

Separately, is there a flink testing utility I could use for integration testing of state checkpointing and recovery?

Thanks,
Abhishek