Flink 1.11 checkpoint compatibility issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 checkpoint compatibility issue

Lu Niu
Hi,

We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot consume from savepoint taken in 1.9.1. Here is the list of operator id and max parallelism of savepoints taken in both versions. The only code change is version upgrade. 

savepoint 1.9.1:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
```

savepoint 1.11:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
```

In the code we use env.setMaxParallsim(900). it is strange that savepoint 1.9.1 has different max parallelism for different operators and we don't know where 1024 and 4096 come from. Here I want to ask the community is it possible these are set by flink itself? 

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 checkpoint compatibility issue

Matthias
Hi Lu,
thanks for reaching out to the community, Lu. Interesting observation. There's no change between 1.9.1 and 1.11 that could explain this behavior as far as I can tell. Have you had a chance to debug the code? Can you provide the code so that we could look into it more closely? 
Another thing: Are you using the TableAPI in your job? There might be some problems with setting the maxParallelism in the TableAPI.

Keep in mind that you could use the State Processor API [1] to adjust the maxParallelism per Operator in a Savepoint.

Best,
Matthias


On Fri, Jan 22, 2021 at 12:49 AM Lu Niu <[hidden email]> wrote:
Hi,

We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot consume from savepoint taken in 1.9.1. Here is the list of operator id and max parallelism of savepoints taken in both versions. The only code change is version upgrade. 

savepoint 1.9.1:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
```

savepoint 1.11:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
```

In the code we use env.setMaxParallsim(900). it is strange that savepoint 1.9.1 has different max parallelism for different operators and we don't know where 1024 and 4096 come from. Here I want to ask the community is it possible these are set by flink itself? 

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 checkpoint compatibility issue

Arvid Heise-4
Hi Lu,

if you are using data stream API make sure to set manual uids for each operator. Only then migrating of savepoints to other major versions of Flink is supported. [1]

Best,


On Fri, Jan 22, 2021 at 3:45 PM Matthias Pohl <[hidden email]> wrote:
Hi Lu,
thanks for reaching out to the community, Lu. Interesting observation. There's no change between 1.9.1 and 1.11 that could explain this behavior as far as I can tell. Have you had a chance to debug the code? Can you provide the code so that we could look into it more closely? 
Another thing: Are you using the TableAPI in your job? There might be some problems with setting the maxParallelism in the TableAPI.

Keep in mind that you could use the State Processor API [1] to adjust the maxParallelism per Operator in a Savepoint.

Best,
Matthias


On Fri, Jan 22, 2021 at 12:49 AM Lu Niu <[hidden email]> wrote:
Hi,

We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot consume from savepoint taken in 1.9.1. Here is the list of operator id and max parallelism of savepoints taken in both versions. The only code change is version upgrade. 

savepoint 1.9.1:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
```

savepoint 1.11:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
```

In the code we use env.setMaxParallsim(900). it is strange that savepoint 1.9.1 has different max parallelism for different operators and we don't know where 1024 and 4096 come from. Here I want to ask the community is it possible these are set by flink itself? 

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 checkpoint compatibility issue

yidan zhao
I think you need provide all the parallelism information, such like the operator info 'Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096'.
What is the parallelism, the maxparallism maybe be generated from the parallelism you have set.

Arvid Heise <[hidden email]> 于2021年1月22日周五 下午11:03写道:
Hi Lu,

if you are using data stream API make sure to set manual uids for each operator. Only then migrating of savepoints to other major versions of Flink is supported. [1]

Best,


On Fri, Jan 22, 2021 at 3:45 PM Matthias Pohl <[hidden email]> wrote:
Hi Lu,
thanks for reaching out to the community, Lu. Interesting observation. There's no change between 1.9.1 and 1.11 that could explain this behavior as far as I can tell. Have you had a chance to debug the code? Can you provide the code so that we could look into it more closely? 
Another thing: Are you using the TableAPI in your job? There might be some problems with setting the maxParallelism in the TableAPI.

Keep in mind that you could use the State Processor API [1] to adjust the maxParallelism per Operator in a Savepoint.

Best,
Matthias


On Fri, Jan 22, 2021 at 12:49 AM Lu Niu <[hidden email]> wrote:
Hi,

We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot consume from savepoint taken in 1.9.1. Here is the list of operator id and max parallelism of savepoints taken in both versions. The only code change is version upgrade. 

savepoint 1.9.1:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
```

savepoint 1.11:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
```

In the code we use env.setMaxParallsim(900). it is strange that savepoint 1.9.1 has different max parallelism for different operators and we don't know where 1024 and 4096 come from. Here I want to ask the community is it possible these are set by flink itself? 

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 checkpoint compatibility issue

Lu Niu
Hi, 

Thanks all for replying.

1. The code uses data stream api only. In the code, we use env.setMaxParallsim() api but not use any operator.setMaxParallsim() api. We do use setParallsim() on each operator.
2. We did set uids for each operator and we can find uids match in two savepoints. 
3.  quote "What is the parallelism, the maxparallism maybe be generated from the parallelism you have set" Could you elaborate more? I don't quite understand the part max parallelism could be generated from the parallelism I set.

Best
Lu

On Sun, Jan 24, 2021 at 5:44 AM 赵一旦 <[hidden email]> wrote:
I think you need provide all the parallelism information, such like the operator info 'Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096'.
What is the parallelism, the maxparallism maybe be generated from the parallelism you have set.

Arvid Heise <[hidden email]> 于2021年1月22日周五 下午11:03写道:
Hi Lu,

if you are using data stream API make sure to set manual uids for each operator. Only then migrating of savepoints to other major versions of Flink is supported. [1]

Best,


On Fri, Jan 22, 2021 at 3:45 PM Matthias Pohl <[hidden email]> wrote:
Hi Lu,
thanks for reaching out to the community, Lu. Interesting observation. There's no change between 1.9.1 and 1.11 that could explain this behavior as far as I can tell. Have you had a chance to debug the code? Can you provide the code so that we could look into it more closely? 
Another thing: Are you using the TableAPI in your job? There might be some problems with setting the maxParallelism in the TableAPI.

Keep in mind that you could use the State Processor API [1] to adjust the maxParallelism per Operator in a Savepoint.

Best,
Matthias


On Fri, Jan 22, 2021 at 12:49 AM Lu Niu <[hidden email]> wrote:
Hi,

We recently migrated from 1.9.1 to flink 1.11 and notice the new job cannot consume from savepoint taken in 1.9.1. Here is the list of operator id and max parallelism of savepoints taken in both versions. The only code change is version upgrade. 

savepoint 1.9.1:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 1024
Id: 21753033b264736cab2e32934441d610, maxparallsim: 4096
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 1024
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 4096
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 4096
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 1024
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 1024
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 4096
```

savepoint 1.11:
```
Id: 8a74550ce6afad759d5f1d6212f43f4a, maxparallsim: 900
Id: 21753033b264736cab2e32934441d610, maxparallsim: 900
Id: e03cdfcd66012e06dc52531958e54e8d, maxparallsim: 900
Id: d1bc8d10e5b8e98e55b2b6c5444f83c7, maxparallsim: 900
Id: d003b5c018424b83b771743563711891, maxparallsim: 900
Id: bb0026f9180b3842f4d781c5f7a4a88f, maxparallsim: 900
Id: b0936afefebc629e050a0f423f44e6ba, maxparallsim: 900
Id: 5b3f7c70ad9b86408e6af3715f928ad1, maxparallsim: 900
Id: 278b3965ca58e95e78ab40884c5ddceb, maxparallsim: 900
Id: 6d0402ca998f4658c7632930a69811ac, maxparallsim: 900
Id: 594970a50fc65ebd163a055fb972541e, maxparallsim: 900
Id: fba56b0a0ee00414d9913103a7c19ff7, maxparallsim: 900
```

In the code we use env.setMaxParallsim(900). it is strange that savepoint 1.9.1 has different max parallelism for different operators and we don't know where 1024 and 4096 come from. Here I want to ask the community is it possible these are set by flink itself? 

Best
Lu