Hi,
I would like to write few tests that would check the message flow in my Flink pipeline. I would like to base my test on [1]. My StreamJob class, that has the main method has all Sinks and Source pluggable. The implementations are based also on [1]. In all examples available online I can see that in the actual test method env.execute() is called, which starts deployment of a job. However in my case, the deployment of job takes some significant amount of time. This is caused by fact that we need to load some "special" libraries that should not be mocked for tests. That is why, we would like to call it only once, hence deploy the job on a MiniClsuter only once. My StreamJob.main method contains all pipeline setup plus call to env.execute(). However when I do that, for example when I initiate my job in another ClassRule method or BeforeClass method, I noticed that tests hangs. The thread from Junit is actually waiting on env.execute(). which in my case never ends. However the underlying minicluster is working fine. Questions: 1. what would be a preferred way to setup my tests, when I would like to deploy my StreamJobOnly once 2. how can i check if a cluster, used in my tests is ready, and Job deployment is finished. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
1)
You can either execute the job in a
separate thread, or set DeploymentOptions.ATTACHED to false in the
MiniCluster configuration.
2)
The cluster not being ready is usually
not really an issue. I wouldn't worry about it for the time being.
(The reason being that the MiniCluster
resource already starts all components and at most requires
components to connect to each other.)
As for when the job is finished, the
MiniClusterWithClientResources provides you with a
MiniClusterClient, on which you can call requestJobResult(JobID).
This returns a future that is completed when the job has
terminated. Alternatively you can poll the job status via
#getJobStatus.
You can retrieve the JobID via
#listJobs().
On 1/11/2021 4:21 PM, KristoffSC wrote:
Hi, I would like to write few tests that would check the message flow in my Flink pipeline. I would like to base my test on [1]. My StreamJob class, that has the main method has all Sinks and Source pluggable. The implementations are based also on [1]. In all examples available online I can see that in the actual test method env.execute() is called, which starts deployment of a job. However in my case, the deployment of job takes some significant amount of time. This is caused by fact that we need to load some "special" libraries that should not be mocked for tests. That is why, we would like to call it only once, hence deploy the job on a MiniClsuter only once. My StreamJob.main method contains all pipeline setup plus call to env.execute(). However when I do that, for example when I initiate my job in another ClassRule method or BeforeClass method, I noticed that tests hangs. The thread from Junit is actually waiting on env.execute(). which in my case never ends. However the underlying minicluster is working fine. Questions: 1. what would be a preferred way to setup my tests, when I would like to deploy my StreamJobOnly once 2. how can i check if a cluster, used in my tests is ready, and Job deployment is finished. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
|
Hi,
that helped however there is a problem with JobStatus. Please refer to [1] In my case JobStatus is already Running but not all task are running. Any idea how to get task status from MiniCluster? [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In that case you will have to query the
REST API instead; you can retrieve the address via
MiniCluster#getRestAddress.
Something along these should work:
try (final RestClient restClient = new RestClient( RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor())) { final JobID jobId = ... final URI restApiAddress = miniCluster.getRestAddress().get(); final JobDetailsHeaders headers = JobDetailsHeaders.getInstance(); final JobMessageParameters parameters = headers.getUnresolvedMessageParameters(); parameters.jobPathParameter.resolve(jobId); final CompletableFuture<JobDetailsInfo> response = restClient.sendRequest( restApiAddress.getHost(), restApiAddress.getPort(), headers, parameters, EmptyRequestBody.getInstance()); final boolean allTasksAreRunning = response.get().getJobVertexInfos().stream() .map(JobDetailsInfo.JobVertexDetailsInfo::getExecutionState) .map(state -> state == ExecutionState.RUNNING) .reduce(true, Boolean::logicalAnd); } On 1/12/2021 11:14 PM, KristoffSC
wrote:
Hi, that helped however there is a problem with JobStatus. Please refer to [1] In my case JobStatus is already Running but not all task are running. Any idea how to get task status from MiniCluster? [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
|
Free forum by Nabble | Edit this page |