[FLINK-10868] the job cannot be exited immediately if job manager is timed out

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[FLINK-10868] the job cannot be exited immediately if job manager is timed out

Young
Hi ZhenQiu & trohrmann:

Currently I backport the  FLINK-10868 to flink-1.5. Most of my jobs(all batch jobs) can be exited immediately after applying for the failed container to the upper limit, but there are still some jobs cannot be exited immediately. Through the log, it is observed that these jobs have the job manager timed out first  for unknown reasons, and the execution of code segment 1 is after the job manager timed out but before the job manager is reconnected, so it is suspected that the job manager is out of synchronization and the code segment 2 is in the code segment notifyAllocationFailure function is not executed .

I'm wandering if you have encountered similar problems? Is there a solution? In order to solve the problem that job cannot be immediately exited, it is currently considered that if (jobManagerRegistration==null) then executes the onFatalError() method to immediately exit the process, it is temporarily unclear whether this violent practice will have any side effects.

Thanks,
Young

code segment 1  in ResourceManager.java:
private void cancelAllPendingSlotRequests(Exception cause) {
slotManager.cancelAllPendingSlotRequests(cause);
}

code segment 2  in ResourceManager.java:
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
validateRunsInMainThread();
log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);

JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
}
}





 

Reply | Threaded
Open this post in threaded view
|

Re: [FLINK-10868] the job cannot be exited immediately if job manager is timed out

Till Rohrmann
Hi Young,

as far as I can tell, FLINK-10868 has not been merged into Flink yet. Thus, I cannot tell much about how well it works. The case you are describing should be properly handled in a version which get's merged though. I guess what needs to happen is that once the JM reconnects to the RM it should synchronize the pending slot requests with the registered slot requests on the RM. But this should be a follow up issue to FLINK-10868, because it would widen the scope too much.

Cheers,
Till

On Fri, Jun 21, 2019 at 2:43 PM Young <[hidden email]> wrote:
Hi ZhenQiu & trohrmann:

Currently I backport the  FLINK-10868 to flink-1.5. Most of my jobs(all batch jobs) can be exited immediately after applying for the failed container to the upper limit, but there are still some jobs cannot be exited immediately. Through the log, it is observed that these jobs have the job manager timed out first  for unknown reasons, and the execution of code segment 1 is after the job manager timed out but before the job manager is reconnected, so it is suspected that the job manager is out of synchronization and the code segment 2 is in the code segment notifyAllocationFailure function is not executed .

I'm wandering if you have encountered similar problems? Is there a solution? In order to solve the problem that job cannot be immediately exited, it is currently considered that if (jobManagerRegistration==null) then executes the onFatalError() method to immediately exit the process, it is temporarily unclear whether this violent practice will have any side effects.

Thanks,
Young

code segment 1  in ResourceManager.java:
private void cancelAllPendingSlotRequests(Exception cause) {
slotManager.cancelAllPendingSlotRequests(cause);
}

code segment 2  in ResourceManager.java:
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
validateRunsInMainThread();
log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);

JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
}
}