Stateful Stream Processing with RocksDB causing Job failure

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Stream Processing with RocksDB causing Job failure

Abiy Legesse Hailemichael
I am running a standalone flink cluster (1.1.2) and I have a stateful streaming job that uses RocksDB as a state manager. I have two stateful operators that are using ValueState<> and ListState<>. Every now and then my job fails with the following exception

java.lang.Exception: Could not restore checkpointed state to operators and functions
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File file:/data/flink/checkpoints/226c84df02e47d1b9c036ba894503145/StreamMap_12_5/dummy_state/chk-83 does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
	at org.apache.hadoop.fs.LocalFileSystem.copyToLocalFile(LocalFileSystem.java:88)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1975)
	at org.apache.flink.streaming.util.HDFSCopyToLocal$1.run(HDFSCopyToLocal.java:48)

Can someone help me with this, Is this  a known issue ?

Thanks

Abiy Hailemichael
Software Engineer
Email: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Stream Processing with RocksDB causing Job failure

Ufuk Celebi
Hey Abiy!

- Do all the task managers run on a single host? Only then using the
local file system will work.

- What does every now and then mean? Every time when the job tries to
take a snapshot? After restarts?

The JobManager logs will also help if we can't figure this out like this.

Best,

Ufuk

On Tue, Dec 20, 2016 at 6:05 PM, Abiy Legesse Hailemichael
<[hidden email]> wrote:

> I am running a standalone flink cluster (1.1.2) and I have a stateful
> streaming job that uses RocksDB as a state manager. I have two stateful
> operators that are using ValueState<> and ListState<>. Every now and then my
> job fails with the following exception
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: File
> file:/data/flink/checkpoints/226c84df02e47d1b9c036ba894503145/StreamMap_12_5/dummy_state/chk-83
> does not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
> at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
> at
> org.apache.hadoop.fs.LocalFileSystem.copyToLocalFile(LocalFileSystem.java:88)
> at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1975)
> at
> org.apache.flink.streaming.util.HDFSCopyToLocal$1.run(HDFSCopyToLocal.java:48)
>
>
> Can someone help me with this, Is this  a known issue ?
>
> Thanks
>
> Abiy Hailemichael
> Software Engineer
> Email: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Stream Processing with RocksDB causing Job failure

Fabian Hueske-2
Copying my reply from the other thread with the same issue to have the discussion in one place.

------

Hi Abiy,

to which type of filesystem are you persisting your checkpoints?

We have seen problems with S3 and its consistency model. These issues have been addressed in newer versions of Flink.
Not sure if the fix went into 1.1.3 already but release 1.1.4 is currently voted on and has tons of other bug fixes as well.
I would suggest to upgrade to 1.1.3 or even 1.1.4 once it is released (should happen in a few days if no regression is found).

Best, Fabian

2016-12-21 10:12 GMT+01:00 Ufuk Celebi <[hidden email]>:
Hey Abiy!

- Do all the task managers run on a single host? Only then using the
local file system will work.

- What does every now and then mean? Every time when the job tries to
take a snapshot? After restarts?

The JobManager logs will also help if we can't figure this out like this.

Best,

Ufuk

On Tue, Dec 20, 2016 at 6:05 PM, Abiy Legesse Hailemichael
<[hidden email]> wrote:
> I am running a standalone flink cluster (1.1.2) and I have a stateful
> streaming job that uses RocksDB as a state manager. I have two stateful
> operators that are using ValueState<> and ListState<>. Every now and then my
> job fails with the following exception
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: File
> file:/data/flink/checkpoints/226c84df02e47d1b9c036ba894503145/StreamMap_12_5/dummy_state/chk-83
> does not exist
>       at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
>       at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
>       at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
>       at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
>       at
> org.apache.hadoop.fs.LocalFileSystem.copyToLocalFile(LocalFileSystem.java:88)
>       at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1975)
>       at
> org.apache.flink.streaming.util.HDFSCopyToLocal$1.run(HDFSCopyToLocal.java:48)
>
>
> Can someone help me with this, Is this  a known issue ?
>
> Thanks
>
> Abiy Hailemichael
> Software Engineer
> Email: [hidden email]
>

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Stream Processing with RocksDB causing Job failure

Abiy Legesse Hailemichael
Thanks for the prompt response.

I am persisting checkpoints to local file system. I have three node cluster, task managers running on two different hosts. At times the job runs for days with no issue but recently it started to fail when it takes snapshots. 

So the RocksDB state backend should only be used in conjunctions with hdfs or s3 on a multi node cluster? 


Thanks,
Abiy 

On Dec 21, 2016, at 4:25 AM, Fabian Hueske <[hidden email]> wrote:

Copying my reply from the other thread with the same issue to have the discussion in one place.

------

Hi Abiy,

to which type of filesystem are you persisting your checkpoints?

We have seen problems with S3 and its consistency model. These issues have been addressed in newer versions of Flink.
Not sure if the fix went into 1.1.3 already but release 1.1.4 is currently voted on and has tons of other bug fixes as well.
I would suggest to upgrade to 1.1.3 or even 1.1.4 once it is released (should happen in a few days if no regression is found).

Best, Fabian

2016-12-21 10:12 GMT+01:00 Ufuk Celebi <[hidden email]>:
Hey Abiy!

- Do all the task managers run on a single host? Only then using the
local file system will work.

- What does every now and then mean? Every time when the job tries to
take a snapshot? After restarts?

The JobManager logs will also help if we can't figure this out like this.

Best,

Ufuk

On Tue, Dec 20, 2016 at 6:05 PM, Abiy Legesse Hailemichael
<[hidden email]> wrote:
> I am running a standalone flink cluster (1.1.2) and I have a stateful
> streaming job that uses RocksDB as a state manager. I have two stateful
> operators that are using ValueState<> and ListState<>. Every now and then my
> job fails with the following exception
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:552)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:250)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: File
> file:/data/flink/checkpoints/226c84df02e47d1b9c036ba894503145/StreamMap_12_5/dummy_state/chk-83
> does not exist
>       at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
>       at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
>       at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
>       at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
>       at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
>       at
> org.apache.hadoop.fs.LocalFileSystem.copyToLocalFile(LocalFileSystem.java:88)
>       at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1975)
>       at
> org.apache.flink.streaming.util.HDFSCopyToLocal$1.run(HDFSCopyToLocal.java:48)
>
>
> Can someone help me with this, Is this  a known issue ?
>
> Thanks
>
> Abiy Hailemichael
> Software Engineer
> Email: [hidden email]
>

Reply | Threaded
Open this post in threaded view
|

Re: Stateful Stream Processing with RocksDB causing Job failure

Ufuk Celebi
On Wed, Dec 21, 2016 at 2:52 PM,  <[hidden email]> wrote:
> So the RocksDB state backend should only be used in conjunctions with hdfs
> or s3 on a multi node cluster?

Yes. Otherwise there is no way to restore the checkpoint on a different host.
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Stream Processing with RocksDB causing Job failure

Abiy Legesse Hailemichael
Thanks, that helps.

Sent from my iPhone

> On Dec 21, 2016, at 12:08 PM, Ufuk Celebi <[hidden email]> wrote:
>
>> On Wed, Dec 21, 2016 at 2:52 PM,  <[hidden email]> wrote:
>> So the RocksDB state backend should only be used in conjunctions with hdfs
>> or s3 on a multi node cluster?
>
> Yes. Otherwise there is no way to restore the checkpoint on a different host.