Test sink behaviour

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Test sink behaviour

David Magalhães
Hi, I've created a CustomSink that writes parquet file to S3. Inside the `invoke` method I have a loop to check if S3 is down, and if it is it will wait exponentially until it is online again.

Now I want to write a test for this, and I can execute everything and see that the Sink is doing what is suppose to do, but I can't have a way to validate that is doing that programmatically (in a integration test).

One of the possibilities I was thinking was check the LazyLogger errors, to verify that something was printed, but I can't mock Logger, since it is final. Since I expose the number of errors as a counter, I was trying to find a way to access it directly with Scala, but the only way I could find was via Rest API, and that is kind of a hack.

Exemple:

- Get the Rest API port with flinkCluster.getClusterClient.getFlinkConfiguration.getInteger("rest.port", 0)
- Get the jobId via http://localhost:61869/jobs/
- Get the verticeId via http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873

Should be available a better way to get the metric or test this ?

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Test sink behaviour

Till Rohrmann
Hi David,

if you want to test the behavior together with S3, then you could check that S3 contains a file after the job has completed.

If you want to test the failure and retry behaviour, then I would suggest to introduce an own abstraction for the S3 access which you can control. That way you can provide a testing implementation which imitates the described behaviour (first being not available and later being reachable). That way you should be able to test the behaviour pretty well w/o having to access metrics.

Cheers,
Till

On Thu, Feb 13, 2020 at 7:41 PM David Magalhães <[hidden email]> wrote:
Hi, I've created a CustomSink that writes parquet file to S3. Inside the `invoke` method I have a loop to check if S3 is down, and if it is it will wait exponentially until it is online again.

Now I want to write a test for this, and I can execute everything and see that the Sink is doing what is suppose to do, but I can't have a way to validate that is doing that programmatically (in a integration test).

One of the possibilities I was thinking was check the LazyLogger errors, to verify that something was printed, but I can't mock Logger, since it is final. Since I expose the number of errors as a counter, I was trying to find a way to access it directly with Scala, but the only way I could find was via Rest API, and that is kind of a hack.

Exemple:

- Get the Rest API port with flinkCluster.getClusterClient.getFlinkConfiguration.getInteger("rest.port", 0)
- Get the jobId via http://localhost:61869/jobs/
- Get the verticeId via http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873

Should be available a better way to get the metric or test this ?

Thanks