Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

Bashar

StateAssignmentOperation.checkStateMappingCompleteness doesn't check for UserDefinedOperatorIDs (specified using setUidHash), causing the exception:

 java.lang.IllegalStateException: There is no operator for the state {}

to be thrown when a savepoint can't be mapped to an ExecutionJobVertex, even when the operator hash is explicitly specified.

I believe this logic should be extended to also include UserDefinedOperatorIDs as so:

for (ExecutionJobVertex executionJobVertex : tasks) {
  allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
  allOperatorIDs.addAll(executionJobVertex.getUserDefinedOperatorIDs());
}
I filed https://issues.apache.org/jira/browse/FLINK-16638, please let me know if I am missing something here.


Reply | Threaded
Open this post in threaded view
|

Re: Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

rmetzger0
Thanks a lot for reporting this potential issue. The ticket looks good.

I would suggest to keep the discussion on the ticket. Otherwise, information will potentially be split between this list and the ticket.
I'm sure a committer will soon look at it.

On Wed, Mar 18, 2020 at 5:19 AM Bashar Abdul-Jawad <[hidden email]> wrote:

StateAssignmentOperation.checkStateMappingCompleteness doesn't check for UserDefinedOperatorIDs (specified using setUidHash), causing the exception:

 java.lang.IllegalStateException: There is no operator for the state {}

to be thrown when a savepoint can't be mapped to an ExecutionJobVertex, even when the operator hash is explicitly specified.

I believe this logic should be extended to also include UserDefinedOperatorIDs as so:

for (ExecutionJobVertex executionJobVertex : tasks) {
  allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
  allOperatorIDs.addAll(executionJobVertex.getUserDefinedOperatorIDs());
}
I filed https://issues.apache.org/jira/browse/FLINK-16638, please let me know if I am missing something here.