回复: need help about "incremental checkpoint",Thanks

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

回复: need help about "incremental checkpoint",Thanks

大森林
where's the actual path?
I can only get one path from the WEB UI

Is it possible that this error happened in step 5 is due to my code's  fault?

------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:13
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

Thanks~!!

I have compared your command with mine in step 5.
Mine is:
       "flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
yours is:
$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]
They are the same.
Could you tell me where am I wrong?
------------------------------------------------------------------------------------------------------------------------
Maybe the error is not caused by this command?
"Unexpected state handle type, expected:
class org.apache.flink.runtime.state.KeyGroupsStateHandle,
but found:
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
----------------------------------------------------------------------------------------------------------------------------------

Thanks~

------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨0:05
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]

The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]> wrote:

>
> I have read the official document
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "大森林" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint  manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
>
> I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> ------------------ 原始邮件 ------------------
> 发件人: "David Anderson" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]> wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> ------------------ 原始邮件 ------------------
>> 发件人: "David Anderson" <[hidden email]>;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林"<[hidden email]>;
>> 抄送: "user"<[hidden email]>;
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the State Processor API [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]> wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>example</groupId>
>>> <artifactId>datastream_api</artifactId>
>>> <version>1.0-SNAPSHOT</version>
>>> <build>
>>> <plugins>
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.1</version>
>>> <configuration>
>>> <source>1.8</source>
>>> <target>1.8</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <plugin>
>>> <groupId>org.scala-tools</groupId>
>>> <artifactId>maven-scala-plugin</artifactId>
>>> <version>2.15.2</version>
>>> <executions>
>>> <execution>
>>> <goals>
>>> <goal>compile</goal>
>>> <goal>testCompile</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>>  
>>>
>>> </plugins>
>>> </build>
>>>
>>> <dependencies>
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.apache.flink</groupId>->
>>> <!-<artifactId>flink-streaming-java_2.12</artifactId>->
>>> <!-<version>1.11.1</version>->
>>> <!-<!–<scope>compile</scope>–>->
>>> <!-</dependency>->
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> <version>1.11.2</version>
>>> <!-<scope>test</scope>->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.hadoop</groupId>
>>> <artifactId>hadoop-client</artifactId>
>>> <version>3.3.0</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-core</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.slf4j</groupId>->
>>> <!-<artifactId>slf4j-simple</artifactId>->
>>> <!-<version>1.7.25</version>->
>>> <!-<scope>compile</scope>->
>>> <!-</dependency>->
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.projectlombok</groupId>
>>> <artifactId>lombok</artifactId>
>>> <version>1.18.4</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> </dependencies>
>>> </project>
>>>
>>>  
>>>
>>> the error I got is:
>>>
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 9999
>>>
>>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 9999
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to [hidden email];
>>>
>>> but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
>>>
>>>  
Reply | Threaded
Open this post in threaded view
|

Re: 回复: need help about "incremental checkpoint",Thanks

Yun Tang
Hi

First of all, as David said, the reason why you get "Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle" is because of you use checkpoint written in RocksDB to resume your job with FsStateBackend.
Have you ever changed your code or update configuration related to state.backend [1]?

Secondly, apart from getting the checkpoint path via web UI, a more accurate way is to list the folder hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/ to see which chk-x folder containing
file named _metadata [2], that's the folder you want to resume checkpoint.



Best
Yun Tang

From: 大森林 <[hidden email]>
Sent: Saturday, October 3, 2020 9:30
To: David Anderson <[hidden email]>
Cc: user <[hidden email]>
Subject: 回复: need help about "incremental checkpoint",Thanks
 
where's the actual path?
I can only get one path from the WEB UI

Is it possible that this error happened in step 5 is due to my code's  fault?

------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:13
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

Thanks~!!

I have compared your command with mine in step 5.
Mine is:
       "flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
yours is:
$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]
They are the same.
Could you tell me where am I wrong?
------------------------------------------------------------------------------------------------------------------------
Maybe the error is not caused by this command?
"Unexpected state handle type, expected:
class org.apache.flink.runtime.state.KeyGroupsStateHandle,
but found:
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
----------------------------------------------------------------------------------------------------------------------------------

Thanks~

------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨0:05
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]

The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]> wrote:
>
> I have read the official document
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "大森林" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint  manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
>
> I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> ------------------ 原始邮件 ------------------
> 发件人: "David Anderson" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]> wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> ------------------ 原始邮件 ------------------
>> 发件人: "David Anderson" <[hidden email]>;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林"<[hidden email]>;
>> 抄送: "user"<[hidden email]>;
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the State Processor API [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]> wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>example</groupId>
>>> <artifactId>datastream_api</artifactId>
>>> <version>1.0-SNAPSHOT</version>
>>> <build>
>>> <plugins>
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.1</version>
>>> <configuration>
>>> <source>1.8</source>
>>> <target>1.8</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <plugin>
>>> <groupId>org.scala-tools</groupId>
>>> <artifactId>maven-scala-plugin</artifactId>
>>> <version>2.15.2</version>
>>> <executions>
>>> <execution>
>>> <goals>
>>> <goal>compile</goal>
>>> <goal>testCompile</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>>  
>>>
>>> </plugins>
>>> </build>
>>>
>>> <dependencies>
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.apache.flink</groupId>->
>>> <!-<artifactId>flink-streaming-java_2.12</artifactId>->
>>> <!-<version>1.11.1</version>->
>>> <!-<!�C<scope>compile</scope>�C>->
>>> <!-</dependency>->
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> <version>1.11.2</version>
>>> <!-<scope>test</scope>->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.hadoop</groupId>
>>> <artifactId>hadoop-client</artifactId>
>>> <version>3.3.0</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-core</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.slf4j</groupId>->
>>> <!-<artifactId>slf4j-simple</artifactId>->
>>> <!-<version>1.7.25</version>->
>>> <!-<scope>compile</scope>->
>>> <!-</dependency>->
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.projectlombok</groupId>
>>> <artifactId>lombok</artifactId>
>>> <version>1.18.4</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> </dependencies>
>>> </project>
>>>
>>>  
>>>
>>> the error I got is:
>>>
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 9999
>>>
>>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 9999
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to [hidden email];
>>>
>>> but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
>>>
>>>  
Reply | Threaded
Open this post in threaded view
|

回复: need help about "incremental checkpoint",Thanks

大森林
In reply to this post by 大森林
Could you give more details?
Thanks


------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:30
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

where's the actual path?
I can only get one path from the WEB UI

Is it possible that this error happened in step 5 is due to my code's  fault?

------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:13
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

Thanks~!!

I have compared your command with mine in step 5.
Mine is:
       "flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
yours is:
$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]
They are the same.
Could you tell me where am I wrong?
------------------------------------------------------------------------------------------------------------------------
Maybe the error is not caused by this command?
"Unexpected state handle type, expected:
class org.apache.flink.runtime.state.KeyGroupsStateHandle,
but found:
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
----------------------------------------------------------------------------------------------------------------------------------

Thanks~

------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨0:05
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]

The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]> wrote:

>
> I have read the official document
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "大森林" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint  manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
>
> I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> ------------------ 原始邮件 ------------------
> 发件人: "David Anderson" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]> wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> ------------------ 原始邮件 ------------------
>> 发件人: "David Anderson" <[hidden email]>;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林"<[hidden email]>;
>> 抄送: "user"<[hidden email]>;
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the State Processor API [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]> wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>example</groupId>
>>> <artifactId>datastream_api</artifactId>
>>> <version>1.0-SNAPSHOT</version>
>>> <build>
>>> <plugins>
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.1</version>
>>> <configuration>
>>> <source>1.8</source>
>>> <target>1.8</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <plugin>
>>> <groupId>org.scala-tools</groupId>
>>> <artifactId>maven-scala-plugin</artifactId>
>>> <version>2.15.2</version>
>>> <executions>
>>> <execution>
>>> <goals>
>>> <goal>compile</goal>
>>> <goal>testCompile</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>>  
>>>
>>> </plugins>
>>> </build>
>>>
>>> <dependencies>
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.apache.flink</groupId>->
>>> <!-<artifactId>flink-streaming-java_2.12</artifactId>->
>>> <!-<version>1.11.1</version>->
>>> <!-<!–<scope>compile</scope>–>->
>>> <!-</dependency>->
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> <version>1.11.2</version>
>>> <!-<scope>test</scope>->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.hadoop</groupId>
>>> <artifactId>hadoop-client</artifactId>
>>> <version>3.3.0</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-core</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.slf4j</groupId>->
>>> <!-<artifactId>slf4j-simple</artifactId>->
>>> <!-<version>1.7.25</version>->
>>> <!-<scope>compile</scope>->
>>> <!-</dependency>->
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.projectlombok</groupId>
>>> <artifactId>lombok</artifactId>
>>> <version>1.18.4</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> </dependencies>
>>> </project>
>>>
>>>  
>>>
>>> the error I got is:
>>>
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 9999
>>>
>>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 9999
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to [hidden email];
>>>
>>> but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
>>>
>>>  
Reply | Threaded
Open this post in threaded view
|

Re: need help about "incremental checkpoint",Thanks

David Anderson-4
This error comes because you changed state backends. The checkpoint was written by a different state backend. This is not supported.

To use incremental checkpoints, you must only use the RocksDbStateBackend: first, when running the job and writing the checkpoint, and again later when restarting.

Best,
David

On Mon, Oct 5, 2020 at 2:38 PM 大森林 <[hidden email]> wrote:
Could you give more details?
Thanks


------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:30
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

where's the actual path?
I can only get one path from the WEB UI

Is it possible that this error happened in step 5 is due to my code's  fault?

------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:13
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

Thanks~!!

I have compared your command with mine in step 5.
Mine is:
       "flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
yours is:
$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]
They are the same.
Could you tell me where am I wrong?
------------------------------------------------------------------------------------------------------------------------
Maybe the error is not caused by this command?
"Unexpected state handle type, expected:
class org.apache.flink.runtime.state.KeyGroupsStateHandle,
but found:
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
----------------------------------------------------------------------------------------------------------------------------------

Thanks~

------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨0:05
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]

The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]> wrote:

>
> I have read the official document
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "大森林" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint  manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
>
> I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> ------------------ 原始邮件 ------------------
> 发件人: "David Anderson" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]> wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> ------------------ 原始邮件 ------------------
>> 发件人: "David Anderson" <[hidden email]>;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林"<[hidden email]>;
>> 抄送: "user"<[hidden email]>;
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the State Processor API [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]> wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>example</groupId>
>>> <artifactId>datastream_api</artifactId>
>>> <version>1.0-SNAPSHOT</version>
>>> <build>
>>> <plugins>
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.1</version>
>>> <configuration>
>>> <source>1.8</source>
>>> <target>1.8</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <plugin>
>>> <groupId>org.scala-tools</groupId>
>>> <artifactId>maven-scala-plugin</artifactId>
>>> <version>2.15.2</version>
>>> <executions>
>>> <execution>
>>> <goals>
>>> <goal>compile</goal>
>>> <goal>testCompile</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>>  
>>>
>>> </plugins>
>>> </build>
>>>
>>> <dependencies>
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.apache.flink</groupId>->
>>> <!-<artifactId>flink-streaming-java_2.12</artifactId>->
>>> <!-<version>1.11.1</version>->
>>> <!-<!–<scope>compile</scope>–>->
>>> <!-</dependency>->
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> <version>1.11.2</version>
>>> <!-<scope>test</scope>->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.hadoop</groupId>
>>> <artifactId>hadoop-client</artifactId>
>>> <version>3.3.0</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-core</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.slf4j</groupId>->
>>> <!-<artifactId>slf4j-simple</artifactId>->
>>> <!-<version>1.7.25</version>->
>>> <!-<scope>compile</scope>->
>>> <!-</dependency>->
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.projectlombok</groupId>
>>> <artifactId>lombok</artifactId>
>>> <version>1.18.4</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> </dependencies>
>>> </project>
>>>
>>>  
>>>
>>> the error I got is:
>>>
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 9999
>>>
>>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 9999
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to [hidden email];
>>>
>>> but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
>>>
>>>  
Reply | Threaded
Open this post in threaded view
|

回复: need help about "incremental checkpoint",Thanks

大森林
I don't  know where I did change the state backends.

There are two meaning of "restarting":
Restarting automatically(success in my experiment)
Restarting manually(failure in my experiment)

The whole experiment(just a wordcount case) and steps are listed in my github:
Could you spare some time for me to check it?
Thanks for your help~!


------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月6日(星期二) 下午4:32
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

This error comes because you changed state backends. The checkpoint was written by a different state backend. This is not supported.

To use incremental checkpoints, you must only use the RocksDbStateBackend: first, when running the job and writing the checkpoint, and again later when restarting.

Best,
David

On Mon, Oct 5, 2020 at 2:38 PM 大森林 <[hidden email]> wrote:
Could you give more details?
Thanks


------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:30
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

where's the actual path?
I can only get one path from the WEB UI

Is it possible that this error happened in step 5 is due to my code's  fault?

------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:13
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

Thanks~!!

I have compared your command with mine in step 5.
Mine is:
       "flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
yours is:
$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]
They are the same.
Could you tell me where am I wrong?
------------------------------------------------------------------------------------------------------------------------
Maybe the error is not caused by this command?
"Unexpected state handle type, expected:
class org.apache.flink.runtime.state.KeyGroupsStateHandle,
but found:
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
----------------------------------------------------------------------------------------------------------------------------------

Thanks~

------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨0:05
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]

The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]> wrote:

>
> I have read the official document
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "大森林" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint  manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
>
> I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> ------------------ 原始邮件 ------------------
> 发件人: "David Anderson" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]> wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> ------------------ 原始邮件 ------------------
>> 发件人: "David Anderson" <[hidden email]>;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林"<[hidden email]>;
>> 抄送: "user"<[hidden email]>;
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the State Processor API [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]> wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>example</groupId>
>>> <artifactId>datastream_api</artifactId>
>>> <version>1.0-SNAPSHOT</version>
>>> <build>
>>> <plugins>
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.1</version>
>>> <configuration>
>>> <source>1.8</source>
>>> <target>1.8</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <plugin>
>>> <groupId>org.scala-tools</groupId>
>>> <artifactId>maven-scala-plugin</artifactId>
>>> <version>2.15.2</version>
>>> <executions>
>>> <execution>
>>> <goals>
>>> <goal>compile</goal>
>>> <goal>testCompile</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>>  
>>>
>>> </plugins>
>>> </build>
>>>
>>> <dependencies>
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.apache.flink</groupId>->
>>> <!-<artifactId>flink-streaming-java_2.12</artifactId>->
>>> <!-<version>1.11.1</version>->
>>> <!-<!–<scope>compile</scope>–>->
>>> <!-</dependency>->
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> <version>1.11.2</version>
>>> <!-<scope>test</scope>->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.hadoop</groupId>
>>> <artifactId>hadoop-client</artifactId>
>>> <version>3.3.0</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-core</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.slf4j</groupId>->
>>> <!-<artifactId>slf4j-simple</artifactId>->
>>> <!-<version>1.7.25</version>->
>>> <!-<scope>compile</scope>->
>>> <!-</dependency>->
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.projectlombok</groupId>
>>> <artifactId>lombok</artifactId>
>>> <version>1.18.4</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> </dependencies>
>>> </project>
>>>
>>>  
>>>
>>> the error I got is:
>>>
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 9999
>>>
>>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 9999
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to [hidden email];
>>>
>>> but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
>>>
>>>  
Reply | Threaded
Open this post in threaded view
|

Re: 回复: need help about "incremental checkpoint",Thanks

Aljoscha Krettek
I'm forwarding my comment from the Jira Issue [1]:

In
https://github.com/appleyuchi/wrongcheckpoint/blob/master/src/main/scala/wordcount_increstate.scala 
you set the RocksDBStateBackend, in
https://github.com/appleyuchi/wrongcheckpoint/blob/master/src/main/scala/StateWordCount.scala 
you set the FsStateBackend. This will not work because the RocksDB
savepoint is not compatible.

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-19486

On 06.10.20 11:02, 大森林 wrote:

> I don't&nbsp; know where I did change the state backends.
>
>
> There are two meaning of "restarting":
> ①Restarting automatically(success in my experiment)
> ②Restarting manually(failure in my experiment)
>
>
> The whole experiment(just a wordcount case) and steps are listed in my github:
> https://github.com/appleyuchi/wrongcheckpoint
> Could you spare some time for me to check it?
> Thanks for your help~!
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "David Anderson"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年10月6日(星期二) 下午4:32
> 收件人:&nbsp;"大森林"<[hidden email]&gt;;
> 抄送:&nbsp;"user"<[hidden email]&gt;;
> 主题:&nbsp;Re: need help about "incremental checkpoint",Thanks
>
>
>
> This error comes because you changed state backends. The checkpoint was written by a different state backend. This is not supported.
>
> To use incremental checkpoints, you must only use the RocksDbStateBackend: first, when running the job and writing the checkpoint, and again later when restarting.
>
>
> Best,
> David
>
>
>
> On Mon, Oct 5, 2020 at 2:38 PM 大森林 <[hidden email]&gt; wrote:
>
> Could you give more details?
> Thanks
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "大森林"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年10月3日(星期六) 上午9:30
> 收件人:&nbsp;"David Anderson"<[hidden email]&gt;;
> 抄送:&nbsp;"user"<[hidden email]&gt;;
> 主题:&nbsp;回复: need help about "incremental checkpoint",Thanks
>
>
>
> where's the actual path?
> I can only get one path from the WEB UI
>
>
> Is it possible that this error happened in step 5 is due to my code's&nbsp; fault?
>
>
> ------------------ 原始邮件 ------------------
> 发件人:                                                                                                                        "大森林"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年10月3日(星期六) 上午9:13
> 收件人:&nbsp;"David Anderson"<[hidden email]&gt;;
> 抄送:&nbsp;"user"<[hidden email]&gt;;
> 主题:&nbsp;回复: need help about "incremental checkpoint",Thanks
>
>
>
> Thanks~!!
>
>
> I have compared your command with mine in step 5.
> Mine is:
> &nbsp; &nbsp; &nbsp; &nbsp;"flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
> yours is:
> $ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file&gt; [args]
> They are the same.
> Could you tell me where am I wrong?
> ------------------------------------------------------------------------------------------------------------------------
> Maybe the error is not caused by this command?
> "Unexpected state handle type, expected:
> class org.apache.flink.runtime.state.KeyGroupsStateHandle,
> but found:
> class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
> ----------------------------------------------------------------------------------------------------------------------------------
>
>
> Thanks~
>
>
> ------------------ 原始邮件 ------------------
> 发件人:                                                                                                                        "David Anderson"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年10月3日(星期六) 凌晨0:05
> 收件人:&nbsp;"大森林"<[hidden email]&gt;;
> 抄送:&nbsp;"user"<[hidden email]&gt;;
> 主题:&nbsp;Re: need help about "incremental checkpoint",Thanks
>
>
>
> If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be
>
> $ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file&gt; [args]
>
> The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.
>
> David
>
> On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]&gt; wrote:
> &gt;
> &gt; I have read the official document
> &gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
> &gt;
> &gt; at the end of above link,it said:
> &gt;
> &gt; $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
> &gt;
> &gt; I have tried the above command in previous experiment,but still no luck.
> &gt; And why the above official command has " :" after "run -s"?
> &gt; I guess " :" not necessary.
> &gt;
> &gt; Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
> &gt;
> &gt; Much Thanks~!
> &gt;
> &gt;
> &gt; ------------------ 原始邮件 ------------------
> &gt; 发件人: "大森林" <[hidden email]&gt;;
> &gt; 发送时间: 2020年10月2日(星期五) 晚上11:41
> &gt; 收件人: "David Anderson"<[hidden email]&gt;;
> &gt; 抄送: "user"<[hidden email]&gt;;
> &gt; 主题: 回复: need help about "incremental checkpoint",Thanks
> &gt;
> &gt; Thanks for your replies~!
> &gt;
> &gt; Could you tell me what the right command is to recover from checkpoint &nbsp;manually using Rocksdb file?
> &gt;
> &gt; I understand that checkpoint is for automatically recovery,
> &gt; but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> &gt; Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
> &gt;
> &gt; I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 &nbsp;in my WEB UI (I guess this is only used for fsStateBackend)
> &gt;
> &gt; Thanks for your help~!
> &gt;
> &gt; ------------------ 原始邮件 ------------------
> &gt; 发件人: "David Anderson" <[hidden email]&gt;;
> &gt; 发送时间: 2020年10月2日(星期五) 晚上11:24
> &gt; 收件人: "大森林"<[hidden email]&gt;;
> &gt; 抄送: "user"<[hidden email]&gt;;
> &gt; 主题: Re: need help about "incremental checkpoint",Thanks
> &gt;
> &gt;&gt; Write in RocksDbStateBackend.
> &gt;&gt; Read in FsStateBackend.
> &gt;&gt; It's NOT a match.
> &gt;
> &gt;
> &gt; Yes, that is right. Also, this does not work:
> &gt;
> &gt; Write in FsStateBackend
> &gt; Read in RocksDbStateBackend
> &gt;
> &gt; For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
> &gt;
> &gt; Best,
> &gt; David
> &gt;
> &gt; On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]&gt; wrote:
> &gt;&gt;
> &gt;&gt; Thanks for your replies~!
> &gt;&gt;
> &gt;&gt; My English is poor ,I have an understanding of your replies:
> &gt;&gt;
> &gt;&gt; Write in RocksDbStateBackend.
> &gt;&gt; Read in FsStateBackend.
> &gt;&gt; It's NOT a match.
> &gt;&gt; So I'm wrong in step 5?
> &gt;&gt; Is my above understanding right?
> &gt;&gt;
> &gt;&gt; Thanks for your help.
> &gt;&gt;
> &gt;&gt; ------------------ 原始邮件 ------------------
> &gt;&gt; 发件人: "David Anderson" <[hidden email]&gt;;
> &gt;&gt; 发送时间: 2020年10月2日(星期五) 晚上10:35
> &gt;&gt; 收件人: "大森林"<[hidden email]&gt;;
> &gt;&gt; 抄送: "user"<[hidden email]&gt;;
> &gt;&gt; 主题: Re: need help about "incremental checkpoint",Thanks
> &gt;&gt;
> &gt;&gt; It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
> &gt;&gt;
> &gt;&gt; It is possible, however, to migrate between state backends using the State Processor API [1].
> &gt;&gt;
> &gt;&gt; [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> &gt;&gt;
> &gt;&gt; Best,
> &gt;&gt; David
> &gt;&gt;
> &gt;&gt; On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]&gt; wrote:
> &gt;&gt;&gt;
> &gt;&gt;&gt; I want to do an experiment of"incremental checkpoint"
> &gt;&gt;&gt;
> &gt;&gt;&gt; my code is:
> &gt;&gt;&gt;
> &gt;&gt;&gt; https://paste.ubuntu.com/p/DpTyQKq6Vk/
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; pom.xml is:
> &gt;&gt;&gt;
> &gt;&gt;&gt; <?xml version="1.0" encoding="UTF-8"?&gt;
> &gt;&gt;&gt; <project xmlns="http://maven.apache.org/POM/4.0.0"
> &gt;&gt;&gt; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> &gt;&gt;&gt; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&gt;
> &gt;&gt;&gt; <modelVersion&gt;4.0.0</modelVersion&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <groupId&gt;example</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;datastream_api</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.0-SNAPSHOT</version&gt;
> &gt;&gt;&gt; <build&gt;
> &gt;&gt;&gt; <plugins&gt;
> &gt;&gt;&gt; <plugin&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.maven.plugins</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;maven-compiler-plugin</artifactId&gt;
> &gt;&gt;&gt; <version&gt;3.1</version&gt;
> &gt;&gt;&gt; <configuration&gt;
> &gt;&gt;&gt; <source&gt;1.8</source&gt;
> &gt;&gt;&gt; <target&gt;1.8</target&gt;
> &gt;&gt;&gt; </configuration&gt;
> &gt;&gt;&gt; </plugin&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <plugin&gt;
> &gt;&gt;&gt; <groupId&gt;org.scala-tools</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;maven-scala-plugin</artifactId&gt;
> &gt;&gt;&gt; <version&gt;2.15.2</version&gt;
> &gt;&gt;&gt; <executions&gt;
> &gt;&gt;&gt; <execution&gt;
> &gt;&gt;&gt; <goals&gt;
> &gt;&gt;&gt; <goal&gt;compile</goal&gt;
> &gt;&gt;&gt; <goal&gt;testCompile</goal&gt;
> &gt;&gt;&gt; </goals&gt;
> &gt;&gt;&gt; </execution&gt;
> &gt;&gt;&gt; </executions&gt;
> &gt;&gt;&gt; </plugin&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; </plugins&gt;
> &gt;&gt;&gt; </build&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependencies&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-streaming-scala_2.11</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.1</version&gt;
> &gt;&gt;&gt; <!-<scope&gt;provided</scope&gt;-&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <!-<dependency&gt;-&gt;
> &gt;&gt;&gt; <!-<groupId&gt;org.apache.flink</groupId&gt;-&gt;
> &gt;&gt;&gt; <!-<artifactId&gt;flink-streaming-java_2.12</artifactId&gt;-&gt;
> &gt;&gt;&gt; <!-<version&gt;1.11.1</version&gt;-&gt;
> &gt;&gt;&gt; <!-<!–<scope&gt;compile</scope&gt;–&gt;-&gt;
> &gt;&gt;&gt; <!-</dependency&gt;-&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-clients_2.11</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.1</version&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-statebackend-rocksdb_2.11</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.2</version&gt;
> &gt;&gt;&gt; <!-<scope&gt;test</scope&gt;-&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.hadoop</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;hadoop-client</artifactId&gt;
> &gt;&gt;&gt; <version&gt;3.3.0</version&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-core</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.1</version&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <!-<dependency&gt;-&gt;
> &gt;&gt;&gt; <!-<groupId&gt;org.slf4j</groupId&gt;-&gt;
> &gt;&gt;&gt; <!-<artifactId&gt;slf4j-simple</artifactId&gt;-&gt;
> &gt;&gt;&gt; <!-<version&gt;1.7.25</version&gt;-&gt;
> &gt;&gt;&gt; <!-<scope&gt;compile</scope&gt;-&gt;
> &gt;&gt;&gt; <!-</dependency&gt;-&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep --&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-cep_2.11</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.1</version&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-cep-scala_2.11</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.1</version&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.apache.flink</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;flink-scala_2.11</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.11.1</version&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; <dependency&gt;
> &gt;&gt;&gt; <groupId&gt;org.projectlombok</groupId&gt;
> &gt;&gt;&gt; <artifactId&gt;lombok</artifactId&gt;
> &gt;&gt;&gt; <version&gt;1.18.4</version&gt;
> &gt;&gt;&gt; <!-<scope&gt;provided</scope&gt;-&gt;
> &gt;&gt;&gt; </dependency&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; </dependencies&gt;
> &gt;&gt;&gt; </project&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; the error I got is:
> &gt;&gt;&gt;
> &gt;&gt;&gt; https://paste.ubuntu.com/p/49HRYXFzR2/
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; some of the above error is:
> &gt;&gt;&gt;
> &gt;&gt;&gt; Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; The steps are:
> &gt;&gt;&gt;
> &gt;&gt;&gt; 1.mvn clean scala:compile compile package
> &gt;&gt;&gt;
> &gt;&gt;&gt; 2.nc -lk 9999
> &gt;&gt;&gt;
> &gt;&gt;&gt; 3.flink run -c wordcount_increstate &nbsp;datastream_api-1.0-SNAPSHOT.jar
> &gt;&gt;&gt; Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
> &gt;&gt;&gt;
> &gt;&gt;&gt; 4.input the following conents in nc -lk 9999
> &gt;&gt;&gt;
> &gt;&gt;&gt; before
> &gt;&gt;&gt; error
> &gt;&gt;&gt; error
> &gt;&gt;&gt; error
> &gt;&gt;&gt; error
> &gt;&gt;&gt;
> &gt;&gt;&gt; 5.
> &gt;&gt;&gt;
> &gt;&gt;&gt; flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
> &gt;&gt;&gt;
> &gt;&gt;&gt; Then the above error happens.
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
> &gt;&gt;&gt;
> &gt;&gt;&gt; Please help,Thanks~!
> &gt;&gt;&gt;
> &gt;&gt;&gt;
> &gt;&gt;&gt; I have tried to subscried to [hidden email];
> &gt;&gt;&gt;
> &gt;&gt;&gt; but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
> &gt;&gt;&gt;
> &gt;&gt;&gt; &nbsp;
>

Reply | Threaded
Open this post in threaded view
|

回复: need help about "incremental checkpoint",Thanks

大森林
In reply to this post by 大森林

I have solved it.
I resumed from the wrong class in the jar.
while the jar contains two different experiment.

Thanks for your help.
Please close my issue.

MUCH MUCH THANKS!
------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月6日(星期二) 下午5:02
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

I don't  know where I did change the state backends.

There are two meaning of "restarting":
Restarting automatically(success in my experiment)
Restarting manually(failure in my experiment)

The whole experiment(just a wordcount case) and steps are listed in my github:
Could you spare some time for me to check it?
Thanks for your help~!


------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月6日(星期二) 下午4:32
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

This error comes because you changed state backends. The checkpoint was written by a different state backend. This is not supported.

To use incremental checkpoints, you must only use the RocksDbStateBackend: first, when running the job and writing the checkpoint, and again later when restarting.

Best,
David

On Mon, Oct 5, 2020 at 2:38 PM 大森林 <[hidden email]> wrote:
Could you give more details?
Thanks


------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:30
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

where's the actual path?
I can only get one path from the WEB UI

Is it possible that this error happened in step 5 is due to my code's  fault?

------------------ 原始邮件 ------------------
发件人: "大森林" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 上午9:13
收件人: "David Anderson"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: 回复: need help about "incremental checkpoint",Thanks

Thanks~!!

I have compared your command with mine in step 5.
Mine is:
       "flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar"
yours is:
$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]
They are the same.
Could you tell me where am I wrong?
------------------------------------------------------------------------------------------------------------------------
Maybe the error is not caused by this command?
"Unexpected state handle type, expected:
class org.apache.flink.runtime.state.KeyGroupsStateHandle,
but found:
class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle"
----------------------------------------------------------------------------------------------------------------------------------

Thanks~

------------------ 原始邮件 ------------------
发件人: "David Anderson" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨0:05
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: need help about "incremental checkpoint",Thanks

If hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 was written by the RocksDbStateBackend, then you can use it to recover if the new job is also using the RocksDbStateBackend. The command would be

$ bin/flink run -s hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3 <jar file> [args]

The ":" character is meant to indicate that you should not use the literal string "checkpointMetaDataPath", but rather replace that with the actual path. Do not include the : character.

David

On Fri, Oct 2, 2020 at 5:58 PM 大森林 <[hidden email]> wrote:

>
> I have read the official document
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "大森林" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:41
> 收件人: "David Anderson"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: 回复: need help about "incremental checkpoint",Thanks
>
> Thanks for your replies~!
>
> Could you tell me what the right command is to recover from checkpoint  manually using Rocksdb file?
>
> I understand that checkpoint is for automatically recovery,
> but in this experiment I stop it by force(input 4 error in nc -lk 9999),
> Is there a way to recover from incremental checkpoint manually ( with RocksdbStateBackend)?
>
> I can only find hdfs://Desktop:9000/tmp/flinkck/1de98c1611c134d915d19ded33aeab54/chk-3  in my WEB UI (I guess this is only used for fsStateBackend)
>
> Thanks for your help~!
>
> ------------------ 原始邮件 ------------------
> 发件人: "David Anderson" <[hidden email]>;
> 发送时间: 2020年10月2日(星期五) 晚上11:24
> 收件人: "大森林"<[hidden email]>;
> 抄送: "user"<[hidden email]>;
> 主题: Re: need help about "incremental checkpoint",Thanks
>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>
>
> Yes, that is right. Also, this does not work:
>
> Write in FsStateBackend
> Read in RocksDbStateBackend
>
> For questions and support in Chinese, you can use the [hidden email]. See the instructions at https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM 大森林 <[hidden email]> wrote:
>>
>> Thanks for your replies~!
>>
>> My English is poor ,I have an understanding of your replies:
>>
>> Write in RocksDbStateBackend.
>> Read in FsStateBackend.
>> It's NOT a match.
>> So I'm wrong in step 5?
>> Is my above understanding right?
>>
>> Thanks for your help.
>>
>> ------------------ 原始邮件 ------------------
>> 发件人: "David Anderson" <[hidden email]>;
>> 发送时间: 2020年10月2日(星期五) 晚上10:35
>> 收件人: "大森林"<[hidden email]>;
>> 抄送: "user"<[hidden email]>;
>> 主题: Re: need help about "incremental checkpoint",Thanks
>>
>> It looks like you were trying to resume from a checkpoint taken with the FsStateBackend into a revised version of the job that uses the RocksDbStateBackend. Switching state backends in this way is not supported: checkpoints and savepoints are written in a state-backend-specific format, and can only be read by the same backend that wrote them.
>>
>> It is possible, however, to migrate between state backends using the State Processor API [1].
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM 大森林 <[hidden email]> wrote:
>>>
>>> I want to do an experiment of"incremental checkpoint"
>>>
>>> my code is:
>>>
>>> https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>> <modelVersion>4.0.0</modelVersion>
>>>
>>> <groupId>example</groupId>
>>> <artifactId>datastream_api</artifactId>
>>> <version>1.0-SNAPSHOT</version>
>>> <build>
>>> <plugins>
>>> <plugin>
>>> <groupId>org.apache.maven.plugins</groupId>
>>> <artifactId>maven-compiler-plugin</artifactId>
>>> <version>3.1</version>
>>> <configuration>
>>> <source>1.8</source>
>>> <target>1.8</target>
>>> </configuration>
>>> </plugin>
>>>
>>> <plugin>
>>> <groupId>org.scala-tools</groupId>
>>> <artifactId>maven-scala-plugin</artifactId>
>>> <version>2.15.2</version>
>>> <executions>
>>> <execution>
>>> <goals>
>>> <goal>compile</goal>
>>> <goal>testCompile</goal>
>>> </goals>
>>> </execution>
>>> </executions>
>>> </plugin>
>>>
>>>  
>>>
>>> </plugins>
>>> </build>
>>>
>>> <dependencies>
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-streaming-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.apache.flink</groupId>->
>>> <!-<artifactId>flink-streaming-java_2.12</artifactId>->
>>> <!-<version>1.11.1</version>->
>>> <!-<!–<scope>compile</scope>–>->
>>> <!-</dependency>->
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-clients_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> <version>1.11.2</version>
>>> <!-<scope>test</scope>->
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.hadoop</groupId>
>>> <artifactId>hadoop-client</artifactId>
>>> <version>3.3.0</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-core</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <!-<dependency>->
>>> <!-<groupId>org.slf4j</groupId>->
>>> <!-<artifactId>slf4j-simple</artifactId>->
>>> <!-<version>1.7.25</version>->
>>> <!-<scope>compile</scope>->
>>> <!-</dependency>->
>>>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-cep-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-scala_2.11</artifactId>
>>> <version>1.11.1</version>
>>> </dependency>
>>>
>>>  
>>>
>>> <dependency>
>>> <groupId>org.projectlombok</groupId>
>>> <artifactId>lombok</artifactId>
>>> <version>1.18.4</version>
>>> <!-<scope>provided</scope>->
>>> </dependency>
>>>
>>> </dependencies>
>>> </project>
>>>
>>>  
>>>
>>> the error I got is:
>>>
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 9999
>>>
>>> 3.flink run -c wordcount_increstate  datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 9999
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to [hidden email];
>>>
>>> but no replies.If possible ,send to [hidden email] with your valuable replies,thanks.
>>>
>>>