Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

Posted by Bashar on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-checkStateMappingCompleteness-doesn-t-include-UserDefinedOperatorIDs-tp33750.html

StateAssignmentOperation.checkStateMappingCompleteness doesn't check for UserDefinedOperatorIDs (specified using setUidHash), causing the exception:

 java.lang.IllegalStateException: There is no operator for the state {}

to be thrown when a savepoint can't be mapped to an ExecutionJobVertex, even when the operator hash is explicitly specified.

I believe this logic should be extended to also include UserDefinedOperatorIDs as so:

for (ExecutionJobVertex executionJobVertex : tasks) {
  allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
  allOperatorIDs.addAll(executionJobVertex.getUserDefinedOperatorIDs());
}
I filed https://issues.apache.org/jira/browse/FLINK-16638, please let me know if I am missing something here.