trying to externalize checkpoint to s3

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

trying to externalize checkpoint to s3

Sathi Chowdhury

We are  running flink 1.2 in pre production

I am trying to test checkpoint stored in external location in s3

 

I have set these below in flink-conf.yaml

 

state.backend: filesystem

state.checkpoints.dir: s3://abc-checkpoint

state.backend.fs.checkpointdir: s3://abc-checkpoint

 

I get this failure in job manager log

java.lang.Exception: Cannot initialize File System State Backend with URI 's3://abc-checkpoint.

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:719)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints.

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:225)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:153)

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5

Any clue? I thought as I am using EMR Hadoop to s3 integration is already working.

Thanks
Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: trying to externalize checkpoint to s3

SHI Xiaogang
Hi Sathi,

According to the format specification of URI, "abc-checkpoint" is the host name in the given uri and the path is null. Therefore, FsStateBackend are complaining about the usage of the root directory. 

Maybe "s3:///abc-checkpoint" ("///" instead of "//") is the uri that you want to use. It will put all checkpoints under the path "/abc-checkpoint".

Regards,
Xiaogang


2017-05-23 9:34 GMT+08:00 Sathi Chowdhury <[hidden email]>:

We are  running flink 1.2 in pre production

I am trying to test checkpoint stored in external location in s3

 

I have set these below in flink-conf.yaml

 

state.backend: filesystem

state.checkpoints.dir: s3://abc-checkpoint

state.backend.fs.checkpointdir: s3://abc-checkpoint

 

I get this failure in job manager log

java.lang.Exception: Cannot initialize File System State Backend with URI 's3://abc-checkpoint.

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:719)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints.

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:225)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:153)

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5

Any clue? I thought as I am using EMR Hadoop to s3 integration is already working.

Thanks
Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

Reply | Threaded
Open this post in threaded view
|

Re: trying to externalize checkpoint to s3

Ted Yu
In reply to this post by Sathi Chowdhury
Adding back user@

Please check the hadoop-common jar in the classpath.

On Mon, May 22, 2017 at 7:12 PM, Sathi Chowdhury <[hidden email]> wrote:

Tried it ,

It does not fail like before but a new error popped up..looks like a jar problem(clash ) to me

thanks

java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom Source (1/1)

        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:95)

        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)

        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)

        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)

        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)

        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)

        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)

        ... 5 common frames omitted

 

 

From: Ted Yu <[hidden email]>
Date: Monday, May 22, 2017 at 6:52 PM
To: Sathi Chowdhury <[hidden email]>
Subject: Re: trying to externalize checkpoint to s3

 

Have you tried specifying subdirectory such as the following ?

 

s3://abc-checkpoint/subdir

 

On Mon, May 22, 2017 at 6:34 PM, Sathi Chowdhury <[hidden email]> wrote:

We are  running flink 1.2 in pre production

I am trying to test checkpoint stored in external location in s3

 

I have set these below in flink-conf.yaml

 

state.backend: filesystem

state.checkpoints.dir: s3://abc-checkpoint

state.backend.fs.checkpointdir: s3://abc-checkpoint

 

I get this failure in job manager log

java.lang.Exception: Cannot initialize File System State Backend with URI 's3://abc-checkpoint.

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:719)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints.

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:225)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:153)

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5

Any clue? I thought as I am using EMR Hadoop to s3 integration is already working.

Thanks
Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

Reply | Threaded
Open this post in threaded view
|

Re: trying to externalize checkpoint to s3

Till Rohrmann
Hi Sathi,

the last error indicates that you are running Flink on a cluster with an incompatible Hadoop version. Please make sure that you use/build Flink with the Hadoop version you have running on your cluster. Especially make sure that the Hadoop version Flink is built for is compatible with the respective EMR version.

Cheers,
Till 

On Tue, May 23, 2017 at 4:30 AM, Ted Yu <[hidden email]> wrote:
Adding back user@

Please check the hadoop-common jar in the classpath.

On Mon, May 22, 2017 at 7:12 PM, Sathi Chowdhury <[hidden email]> wrote:

Tried it ,

It does not fail like before but a new error popped up..looks like a jar problem(clash ) to me

thanks

java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom Source (1/1)

        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:95)

        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)

        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)

        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)

        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)

        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)

        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)

        ... 5 common frames omitted

 

 

From: Ted Yu <[hidden email]>
Date: Monday, May 22, 2017 at 6:52 PM
To: Sathi Chowdhury <[hidden email]>
Subject: Re: trying to externalize checkpoint to s3

 

Have you tried specifying subdirectory such as the following ?

 

s3://abc-checkpoint/subdir

 

On Mon, May 22, 2017 at 6:34 PM, Sathi Chowdhury <[hidden email]> wrote:

We are  running flink 1.2 in pre production

I am trying to test checkpoint stored in external location in s3

 

I have set these below in flink-conf.yaml

 

state.backend: filesystem

state.checkpoints.dir: s3://abc-checkpoint

state.backend.fs.checkpointdir: s3://abc-checkpoint

 

I get this failure in job manager log

java.lang.Exception: Cannot initialize File System State Backend with URI 's3://abc-checkpoint.

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:719)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints.

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:225)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:153)

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5

Any clue? I thought as I am using EMR Hadoop to s3 integration is already working.

Thanks
Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============


Reply | Threaded
Open this post in threaded view
|

Re: trying to externalize checkpoint to s3

Sathi Chowdhury

Hi Till, thanks for your reply.I have to try out my fatjar not including Hadoop classes as well.

 

From: Till Rohrmann <[hidden email]>
Date: Tuesday, May 23, 2017 at 7:12 AM
To: Ted Yu <[hidden email]>
Cc: Sathi Chowdhury <[hidden email]>, user <[hidden email]>
Subject: Re: trying to externalize checkpoint to s3

 

Hi Sathi,

 

the last error indicates that you are running Flink on a cluster with an incompatible Hadoop version. Please make sure that you use/build Flink with the Hadoop version you have running on your cluster. Especially make sure that the Hadoop version Flink is built for is compatible with the respective EMR version.

 

Cheers,

Till 

 

On Tue, May 23, 2017 at 4:30 AM, Ted Yu <[hidden email]> wrote:

Adding back user@

 

Please check the hadoop-common jar in the classpath.

 

On Mon, May 22, 2017 at 7:12 PM, Sathi Chowdhury <[hidden email]> wrote:

Tried it ,

It does not fail like before but a new error popped up..looks like a jar problem(clash ) to me

thanks

java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom Source (1/1)

        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:95)

        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)

        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)

        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)

        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)

        at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)

        at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)

        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)

        ... 5 common frames omitted

 

 

From: Ted Yu <[hidden email]>
Date: Monday, May 22, 2017 at 6:52 PM
To: Sathi Chowdhury <[hidden email]>
Subject: Re: trying to externalize checkpoint to s3

 

Have you tried specifying subdirectory such as the following ?

 

s3://abc-checkpoint/subdir

 

On Mon, May 22, 2017 at 6:34 PM, Sathi Chowdhury <[hidden email]> wrote:

We are  running flink 1.2 in pre production

I am trying to test checkpoint stored in external location in s3

 

I have set these below in flink-conf.yaml

 

state.backend: filesystem

state.checkpoints.dir: s3://abc-checkpoint

state.backend.fs.checkpointdir: s3://abc-checkpoint

 

I get this failure in job manager log

java.lang.Exception: Cannot initialize File System State Backend with URI 's3://abc-checkpoint.

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:57)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:719)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints.

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:225)

        at org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:153)

        at org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:5

Any clue? I thought as I am using EMR Hadoop to s3 integration is already working.

Thanks
Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============