Hi everyone,
I'm getting the following error when trying to restore from a savepoint. Here below is the output from flink bin, and in the attachment is a TM log. I didn't have any change in the app before and after savepoint. All Window operators have been assigned unique ID string. Could you please help give a look? Thanks and best regards, Averell taskmanager.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz> org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 606ad5239f5e23cedb85d3e75bf76463) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664) at com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442) at com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 22 more Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) ... 5 more Caused by: java.lang.IllegalStateException: Unexpected key-group in restore. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
Do you try to scale the job up, meaning do you increase the job parallelism? Have you increased the job max parallelism by chance? If so this is not supported. The max parallelism parameter is used to create key groups that can be further assigned to parallel operators. This parameter cannot be changed for a job that shall be restored. If this is not the case, maybe Stefan(cc) have some ideas, what can go wrong. Best, Dawid On 10/10/18 09:23, Averell wrote: > Hi everyone, > > I'm getting the following error when trying to restore from a savepoint. > Here below is the output from flink bin, and in the attachment is a TM log. > I didn't have any change in the app before and after savepoint. All Window > operators have been assigned unique ID string. > > Could you please help give a look? > > Thanks and best regards, > Averell > > taskmanager.gz > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz> > > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 606ad5239f5e23cedb85d3e75bf76463) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664) > at > com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442) > at > com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 22 more > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) > ... 5 more > Caused by: java.lang.IllegalStateException: Unexpected key-group in restore. > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ signature.asc (849 bytes) Download Attachment |
Hi,
adding to Dawids questions, it would also be very helpful to know which Flink version was used to create the savepoint, which Flink version was used in the restore attempt, if the savepoint was moved or modified. Outside of potential conflicts with those things, I would not expect anything like this. Best, Stefan > On 10. Oct 2018, at 09:51, Dawid Wysakowicz <[hidden email]> wrote: > > Hi Averell, > > Do you try to scale the job up, meaning do you increase the job > parallelism? Have you increased the job max parallelism by chance? If so > this is not supported. The max parallelism parameter is used to create > key groups that can be further assigned to parallel operators. This > parameter cannot be changed for a job that shall be restored. > > If this is not the case, maybe Stefan(cc) have some ideas, what can go > wrong. > > Best, > > Dawid > > > On 10/10/18 09:23, Averell wrote: >> Hi everyone, >> >> I'm getting the following error when trying to restore from a savepoint. >> Here below is the output from flink bin, and in the attachment is a TM log. >> I didn't have any change in the app before and after savepoint. All Window >> operators have been assigned unique ID string. >> >> Could you please help give a look? >> >> Thanks and best regards, >> Averell >> >> taskmanager.gz >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/taskmanager.gz> >> >> org.apache.flink.client.program.ProgramInvocationException: Job failed. >> (JobID: 606ad5239f5e23cedb85d3e75bf76463) >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:664) >> at >> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam$.main(StreamingSdcWithAverageByDslam.scala:442) >> at >> com.nbnco.csa.analysis.copper.sdc.flink.StreamingSdcWithAverageByDslam.main(StreamingSdcWithAverageByDslam.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) >> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >> ... 22 more >> Caused by: java.lang.Exception: Exception while creating >> StreamOperatorStateContext. >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for WindowOperator_b7287b12f90aa788ab162856424c6d40_(8/64) >> from any of the 1 provided restore options. >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) >> ... 5 more >> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore. >> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > |
Hi Stefan, Dawid, I hadn't changed anything in the configuration. Env's parallelism stayed at 64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink 1.7-SNAPSHOT, with the code pulled from master about 5 days back. Savepoint was saved to either S3 or HDFS (I tried multiple times), and had not been moved. Is there any kind of improper user code can cause such error? Thanks and best regards, Averell On Wed, Oct 10, 2018 at 7:02 PM Stefan Richter <[hidden email]> wrote: Hi, |
In reply to this post by Stefan Richter
Hi Stefan, Dawid,
I hadn't changed anything in the configuration. Env's parallelism stayed at 64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink 1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back. Savepoint was saved to either S3 or HDFS (I tried multiple times), and had not been moved. Is there any kind of improper user code can cause such error? Thanks for your support. Best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
In the logs there are some “Split Reader: Custom File Source:” This is a custom source you implemented? Also is your keySelector deterministic with proper equals and hashcode methods? Cheers, Kostas > On Oct 10, 2018, at 10:50 AM, Averell <[hidden email]> wrote: > > Hi Stefan, Dawid, > > I hadn't changed anything in the configuration. Env's parallelism stayed at > 64. Some source/sink operators have parallelism of 1 to 8. I'm using Flink > 1.7-SNAPSHOT, with the code pulled from the master branch about 5 days back. > Savepoint was saved to either S3 or HDFS (I tried multiple times), and had > not been moved. > > Is there any kind of improper user code can cause such error? > > Thanks for your support. > > Best regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
Yes, I modified ContinuousFileMonitoringFunction to add one more ListState<Long>. The error might/should have come from that, but I haven't been able to find out why. All of my keyed streams are defined by Scala tuples like: /ikeyBy(r => (r.customer_id, r.address))/, and the fields using as keys are of types either String or Long. For this, I don't have to define equals and hashcode method, do I? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You restore your job with the custom source from a savepoint taken without the custom source?
> On Oct 10, 2018, at 11:34 AM, Averell <[hidden email]> wrote: > > Hi Kostas, > > Yes, I modified ContinuousFileMonitoringFunction to add one more > ListState<Long>. The error might/should have come from that, but I haven't > been able to find out why. > > All of my keyed streams are defined by Scala tuples like: /ikeyBy(r => > (r.customer_id, r.address))/, and the fields using as keys are of types > either String or Long. For this, I don't have to define equals and hashcode > method, do I? > > Thanks and best regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
No, the same code was used. I (1) started the job, (2) created a savepoint, (3) cancelled the job, (4) restored the job with the same command as in (1) with the addition "-s <savepoint_path>". Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi everyone,
In the StreamExecutionEnvironment.createFileInput method, a file source is created as following: /SingleOutputStreamOperator<OUT> source = *addSource*(monitoringFunction, sourceName) .*transform*("Split Reader: " + sourceName, typeInfo, reader);/ Does this create two different operators? If yes, then it seems impossible to assign a UID to the 1st operator. And might it be the cause for my problem? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
This could be the root cause of your problem! Thanks for digging into it. Would it be possible for you to verify that this is your problem by manually setting the UUID and seeing if the problem disappears? In addition, please file a JIRA. Thanks a lot, Kostas > On Oct 15, 2018, at 8:29 AM, Averell <[hidden email]> wrote: > > Hi everyone, > > In the StreamExecutionEnvironment.createFileInput method, a file source is > created as following: > /SingleOutputStreamOperator<OUT> source = *addSource*(monitoringFunction, > sourceName) > .*transform*("Split Reader: " + sourceName, typeInfo, reader);/ > > Does this create two different operators? If yes, then it seems impossible > to assign a UID to the 1st operator. And might it be the cause for my > problem? > > Thanks and best regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Averell
Hi,
I think it is rather unlikely that this is the problem because it should give a different kind of exception. Would it be possible to provide a minimal and self-contained example code for a problematic job? Best, Stefan > On 15. Oct 2018, at 08:29, Averell <[hidden email]> wrote: > > Hi everyone, > > In the StreamExecutionEnvironment.createFileInput method, a file source is > created as following: > /SingleOutputStreamOperator<OUT> source = *addSource*(monitoringFunction, > sourceName) > .*transform*("Split Reader: " + sourceName, typeInfo, reader);/ > > Does this create two different operators? If yes, then it seems impossible > to assign a UID to the 1st operator. And might it be the cause for my > problem? > > Thanks and best regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas, Stefan,
The problem doesn't come on all of my builds, so it is a little bit difficult to track. Are there any specific classes that I can turn DEBUG on to help in finding the problem? (Turning DEBUG on globally seems too much). Will try to minimize the code and post it. One more point that I notice is the error doesn't stay on one single operator but changes from time to time (even within the same build). For example, the previous exception I quoted was with a Window operator, while the one below is with CoStreamFlatMap. Thanks and best regards, Averell Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) ... 5 more Caused by: java.lang.IllegalStateException: Unexpected key-group in restore. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I see, then the important question for me is if the problem exists on the release/master code or just on your branches. Of course we can hardly give any advice for custom builds and without any code. In general, you should debug in HeapKeyedStateBackend lines lines 774-776 (the write part) and check against 472-474 (the read part). What happens there is very straight forward: remember the offset of the output stream, write the key-group. The read part the seeks to the remembered offset and reads the key-group. They must match. Best, Stefan > On 15. Oct 2018, at 11:35, Averell <[hidden email]> wrote: > > Hi Kostas, Stefan, > > The problem doesn't come on all of my builds, so it is a little bit > difficult to track. Are there any specific classes that I can turn DEBUG on > to help in finding the problem? (Turning DEBUG on globally seems too much). > Will try to minimize the code and post it. > > One more point that I notice is the error doesn't stay on one single > operator but changes from time to time (even within the same build). For > example, the previous exception I quoted was with a Window operator, while > the one below is with CoStreamFlatMap. > > Thanks and best regards, > Averell > > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for *CoStreamFlatMap*_68cd726422cf10170c4d6c7fd52ed309_(12/64) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) > ... 5 more > Caused by: java.lang.IllegalStateException: Unexpected key-group in restore. > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:475) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:438) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:377) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thank you Stefan, I'll try to follow your guide to debug.
And sorry for being confusing in the previous email. When I said "different builds", I meant different versions of my application, not different builds of Flink. Between versions of my application, I do add/remove some operators. However, as I mentioned from the 1st email, I got errors when restoring savepoint created by the same version of my application. Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |