NPE when restoring from savepoint in Flink 1.13.1 application

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

NPE when restoring from savepoint in Flink 1.13.1 application

ChangZhuo Chen (陳昌倬)
Hi,

We have NullPointerException when trying to restore from savepoint for
the same jar, or different jar, or different parallelism.  We have
workaround this issue by changing UIDs in almost all operators. We want
to know if there is anyway to avoid this problem so that it will not
cause service maintence problem, thanks.


The following is redacted stack trace we can provide for now:

    2021-06-09 13:08:59,849 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application:
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job '<censored>'.
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
            at java.lang.Thread.run(Thread.java:834) [?:?]
    Caused by: org.apache.flink.util.FlinkException: Failed to execute job '<censored>'.
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at <censored> ~[?:?]
            at <censored> ~[?:?]
            at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
            at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
            at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
            at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            ... 12 more
    Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
            at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
            at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
            ... 1 more
    Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
            at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
            ... 6 more
    Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ~[?:?]
            ... 6 more
    Caused by: java.lang.NullPointerException
            at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reAssignSubKeyedStates(StateAssignmentOperation.java:300) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.lambda$reDistributeKeyedStates$0(StateAssignmentOperation.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at java.util.HashMap.forEach(HashMap.java:1336) ~[?:?]
            at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeKeyedStates(StateAssignmentOperation.java:252) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:196) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
            ... 7 more
    2021-06-09 13:08:59,852 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: NPE when restoring from savepoint in Flink 1.13.1 application

Roman Khachatryan
Hi ChangZhuo,

Thanks for reporting, it looks like a bug.
I've opened a ticket for that [1].

[1]
https://issues.apache.org/jira/browse/FLINK-22966

Regards,
Roman

On Wed, Jun 9, 2021 at 4:07 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:

>
> Hi,
>
> We have NullPointerException when trying to restore from savepoint for
> the same jar, or different jar, or different parallelism.  We have
> workaround this issue by changing UIDs in almost all operators. We want
> to know if there is anyway to avoid this problem so that it will not
> cause service maintence problem, thanks.
>
>
> The following is redacted stack trace we can provide for now:
>
>     2021-06-09 13:08:59,849 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application:
>     org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job '<censored>'.
>             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
>             at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>             at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>             at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
>             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
>             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
>             at java.lang.Thread.run(Thread.java:834) [?:?]
>     Caused by: org.apache.flink.util.FlinkException: Failed to execute job '<censored>'.
>             at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at <censored> ~[?:?]
>             at <censored> ~[?:?]
>             at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
>             at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
>             at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
>             at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             ... 12 more
>     Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
>             at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
>             at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
>             ... 1 more
>     Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
>             at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
>             at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
>             at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
>             at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
>             ... 6 more
>     Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
>             at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
>             at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
>             at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ~[?:?]
>             ... 6 more
>     Caused by: java.lang.NullPointerException
>             at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reAssignSubKeyedStates(StateAssignmentOperation.java:300) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.lambda$reDistributeKeyedStates$0(StateAssignmentOperation.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at java.util.HashMap.forEach(HashMap.java:1336) ~[?:?]
>             at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeKeyedStates(StateAssignmentOperation.java:252) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:196) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>             ... 7 more
>     2021-06-09 13:08:59,852 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: NPE when restoring from savepoint in Flink 1.13.1 application

ChangZhuo Chen (陳昌倬)
On Thu, Jun 10, 2021 at 07:10:45PM +0200, Roman Khachatryan wrote:
> Hi ChangZhuo,
>
> Thanks for reporting, it looks like a bug.
> I've opened a ticket for that [1].
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-22966

Thanks for the help.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment