Testing Flink Jobs

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Testing Flink Jobs

KristoffSC
Hi,
I would like to write few tests that would check the message flow in my
Flink pipeline.
I would like to base my test on [1].

My StreamJob class, that has the main method has all Sinks and Source
pluggable. The implementations are based also on [1].

In all examples available online I can see that in the actual test method
env.execute() is called, which starts deployment of a job.

However in my case, the deployment of job takes some significant amount of
time. This is caused by fact that we need to load some "special" libraries
that should not be mocked for tests. That is why, we would like to call it
only once, hence deploy the job on a MiniClsuter only once.

My StreamJob.main method contains all pipeline setup plus call to
env.execute().


However when I do that, for example when I initiate my job in another
ClassRule method or BeforeClass method, I noticed that tests hangs. The
thread from Junit is actually waiting on env.execute(). which in my case
never ends. However the underlying minicluster is working fine.


Questions:
1. what would be a preferred way to setup my tests, when I would like to
deploy my StreamJobOnly once
2. how can i check if a cluster, used in my tests is ready, and Job
deployment is finished.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Testing Flink Jobs

Chesnay Schepler
1)
You can either execute the job in a separate thread, or set DeploymentOptions.ATTACHED to false in the MiniCluster configuration.
2)
The cluster not being ready is usually not really an issue. I wouldn't worry about it for the time being.
(The reason being that the MiniCluster resource already starts all components and at most requires components to connect to each other.)
As for when the job is finished, the MiniClusterWithClientResources provides you with a MiniClusterClient, on which you can call requestJobResult(JobID). This returns a future that is completed when the job has terminated. Alternatively you can poll the job status via #getJobStatus.
You can retrieve the JobID via #listJobs().

On 1/11/2021 4:21 PM, KristoffSC wrote:
Hi,
I would like to write few tests that would check the message flow in my
Flink pipeline. 
I would like to base my test on [1].

My StreamJob class, that has the main method has all Sinks and Source
pluggable. The implementations are based also on [1].

In all examples available online I can see that in the actual test method
env.execute() is called, which starts deployment of a job.

However in my case, the deployment of job takes some significant amount of
time. This is caused by fact that we need to load some "special" libraries
that should not be mocked for tests. That is why, we would like to call it
only once, hence deploy the job on a MiniClsuter only once.

My StreamJob.main method contains all pipeline setup plus call to
env.execute().


However when I do that, for example when I initiate my job in another
ClassRule method or BeforeClass method, I noticed that tests hangs. The
thread from Junit is actually waiting on env.execute(). which in my case
never ends. However the underlying minicluster is working fine. 


Questions:
1. what would be a preferred way to setup my tests, when I would like to
deploy my StreamJobOnly once
2. how can i check if a cluster, used in my tests is ready, and Job
deployment is finished.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Testing Flink Jobs

KristoffSC
Hi,
that helped however there is a problem with JobStatus. Please refer to [1]

In my case JobStatus is already Running but not all task are running.
Any idea how to get task status from MiniCluster?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Testing Flink Jobs

Chesnay Schepler
In that case you will have to query the REST API instead; you can retrieve the address via MiniCluster#getRestAddress.

Something along these should work:
try (final RestClient restClient =
        new RestClient(
                RestClientConfiguration.fromConfiguration(new Configuration()),
                Executors.directExecutor())) {

    final JobID jobId = ...
    final URI restApiAddress = miniCluster.getRestAddress().get();

    final JobDetailsHeaders headers = JobDetailsHeaders.getInstance();
    final JobMessageParameters parameters = headers.getUnresolvedMessageParameters();
    parameters.jobPathParameter.resolve(jobId);

    final CompletableFuture<JobDetailsInfo> response =
            restClient.sendRequest(
                    restApiAddress.getHost(),
                    restApiAddress.getPort(),
                    headers,
                    parameters,
                    EmptyRequestBody.getInstance());

    final boolean allTasksAreRunning =
            response.get().getJobVertexInfos().stream()
                    .map(JobDetailsInfo.JobVertexDetailsInfo::getExecutionState)
                    .map(state -> state == ExecutionState.RUNNING)
                    .reduce(true, Boolean::logicalAnd);
}

On 1/12/2021 11:14 PM, KristoffSC wrote:
Hi, 
that helped however there is a problem with JobStatus. Please refer to [1]

In my case JobStatus is already Running but not all task are running. 
Any idea how to get task status from MiniCluster?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/