Fail to cancel perJob for that deregisterApplication is not called

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fail to cancel perJob for that deregisterApplication is not called

liujiangang
      I am using flink 1.10.0. My perJob can not be cancelled. From the log I find that  webMonitorEndpoint.closeAsync() is completed but deregisterApplication is not called. The related code is as follows:
public CompletableFuture<Void> deregisterApplicationAndClose(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {

if (isRunning.compareAndSet(true, false)) {
final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId()));

return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
} else {
return terminationFuture;
}
}
      For webMonitorEndpoint.closeAsync(), the code is as follows:
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
log.info("State is {}. Shutting down rest endpoint.", state);

if (state == State.RUNNING) {
final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
closeHandlersAsync(),
this::shutDownInternal);

shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
log.info("Shut down complete.");
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
});
state = State.SHUTDOWN;
} else if (state == State.CREATED) {
terminationFuture.complete(null);
state = State.SHUTDOWN;
}

return terminationFuture;
}
}
      I am sure that it is completed with the log I added as follows:
image.png
     
     For deregisterApplication, I do not see any related log like "Shut down cluster because application is in {}, diagnostics {}.".
      Can anyone give me some suggestions? Thank you.
      

Reply | Threaded
Open this post in threaded view
|

Re: Fail to cancel perJob for that deregisterApplication is not called

Chesnay Schepler
Where exactly did you add your own log message?

WebMonitorEndpoint.closeAsync() already logs on it's own whether the shutdown future was completed; meaning that it shouldn't have been necessary to add a separate log message.
If you now only see the one you added, chances are that it was added at the wrong place.

On 3/24/2021 5:06 AM, 刘建刚 wrote:
      I am using flink 1.10.0. My perJob can not be cancelled. From the log I find that  webMonitorEndpoint.closeAsync() is completed but deregisterApplication is not called. The related code is as follows:
public CompletableFuture<Void> deregisterApplicationAndClose(
      final ApplicationStatus applicationStatus,
      final @Nullable String diagnostics) {

   if (isRunning.compareAndSet(true, false)) {
      final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
         FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
            deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId()));

      return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
   } else {
      return terminationFuture;
   }
}
      For webMonitorEndpoint.closeAsync(), the code is as follows:
public CompletableFuture<Void> closeAsync() {
   synchronized (lock) {
      log.info("State is {}. Shutting down rest endpoint.", state);

      if (state == State.RUNNING) {
         final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
            closeHandlersAsync(),
            this::shutDownInternal);

         shutDownFuture.whenComplete(
            (Void ignored, Throwable throwable) -> {
               log.info("Shut down complete.");
               if (throwable != null) {
                  terminationFuture.completeExceptionally(throwable);
               } else {
                  terminationFuture.complete(null);
               }
            });
         state = State.SHUTDOWN;
      } else if (state == State.CREATED) {
         terminationFuture.complete(null);
         state = State.SHUTDOWN;
      }

      return terminationFuture;
   }
}
      I am sure that it is completed with the log I added as follows:
image.png
     
     For deregisterApplication, I do not see any related log like "Shut down cluster because application is in {}, diagnostics {}.".
      Can anyone give me some suggestions? Thank you.
      


Reply | Threaded
Open this post in threaded view
|

Re: Fail to cancel perJob for that deregisterApplication is not called

liujiangang
Thank you for the answer.

I met the same problem again. I add log in RestServerEndpoint's closeAsync method as following:
@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
log.info("State is {}. Shutting down rest endpoint.", state);

if (state == State.RUNNING) {
final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
closeHandlersAsync(),
this::shutDownInternal);

shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
log.info("Shut down complete with {}.", terminationFuture);
});
state = State.SHUTDOWN;
} else if (state == State.CREATED) {
terminationFuture.complete(null);
state = State.SHUTDOWN;
}

return terminationFuture;
}
}
After closeAsync, it is expected to execute DispatcherResourceManagerComponent's deregisterApplication method as following:
public CompletableFuture<Void> deregisterApplicationAndClose(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {

if (isRunning.compareAndSet(true, false)) {
final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId()));

return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
} else {
return terminationFuture;
}
}
However, Resource's deregisterApplication method is not executed. I do not know why. Any suggestions?




Chesnay Schepler [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年3月26日周五 下午6:54写道:
Where exactly did you add your own log message?

WebMonitorEndpoint.closeAsync() already logs on it's own whether the shutdown future was completed; meaning that it shouldn't have been necessary to add a separate log message.
If you now only see the one you added, chances are that it was added at the wrong place.

On 3/24/2021 5:06 AM, 刘建刚 wrote:
      I am using flink 1.10.0. My perJob can not be cancelled. From the log I find that  webMonitorEndpoint.closeAsync() is completed but deregisterApplication is not called. The related code is as follows:
public CompletableFuture<Void> deregisterApplicationAndClose(
      final ApplicationStatus applicationStatus,
      final @Nullable String diagnostics) {

   if (isRunning.compareAndSet(true, false)) {
      final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
         FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
            deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId()));

      return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
   } else {
      return terminationFuture;
   }
}
      For webMonitorEndpoint.closeAsync(), the code is as follows:
public CompletableFuture<Void> closeAsync() {
   synchronized (lock) {
      log.info("State is {}. Shutting down rest endpoint.", state);

      if (state == State.RUNNING) {
         final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
            closeHandlersAsync(),
            this::shutDownInternal);

         shutDownFuture.whenComplete(
            (Void ignored, Throwable throwable) -> {
               log.info("Shut down complete.");
               if (throwable != null) {
                  terminationFuture.completeExceptionally(throwable);
               } else {
                  terminationFuture.complete(null);
               }
            });
         state = State.SHUTDOWN;
      } else if (state == State.CREATED) {
         terminationFuture.complete(null);
         state = State.SHUTDOWN;
      }

      return terminationFuture;
   }
}
      I am sure that it is completed with the log I added as follows:
image.png
     
     For deregisterApplication, I do not see any related log like "Shut down cluster because application is in {}, diagnostics {}.".
      Can anyone give me some suggestions? Thank you.
      





To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML