(no subject)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

(no subject)

Abiy Legesse Hailemichael
I am running a standalone flink cluster (1.1.2) and I have a stateful streaming job that uses RocksDB as a state manager. I have two stateful operators that are using ValueState<> and ListState<>. Every now and then my job fails with the following exception

Caused by: AsynchronousException{java.io.FileNotFoundException: File file:/data/flink/checkpoints/471ef8996921bb9c29434abf35490a26/StreamMap_12_0/dummy_state/chk-4 does not exist}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
Caused by: java.io.FileNotFoundException: File file:/data/flink/checkpoints/471ef8996921bb9c29434abf35490a26/StreamMap_12_0/dummy_state/chk-4 does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.hadoop.fs.FileSystem.getContentSummary(FileSystem.java:1467)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend$FinalSemiAsyncSnapshot.getStateSize(RocksDBStateBackend.java:688)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:89)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:860)



Abiy Hailemichael
Software Engineer
Phone: (202) 355-8933
Email: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re:

Fabian Hueske-2
Hi Abiy,

to which type of filesystem are you persisting your checkpoints?

We have seen problems with S3 and its consistency model. These issues have been addressed in newer versions of Flink.
Not sure if the fix went into 1.1.3 already but release 1.1.4 is currently voted on and has tons of other bug fixes as well.
I would suggest to upgrade to 1.1.3 or even 1.1.4 once it is released (should happen in a few days if no regression is found).

Best, Fabian

2016-12-20 21:19 GMT+01:00 Abiy Legesse Hailemichael <[hidden email]>:
I am running a standalone flink cluster (1.1.2) and I have a stateful streaming job that uses RocksDB as a state manager. I have two stateful operators that are using ValueState<> and ListState<>. Every now and then my job fails with the following exception

Caused by: AsynchronousException{java.io.FileNotFoundException: File file:/data/flink/checkpoints/471ef8996921bb9c29434abf35490a26/StreamMap_12_0/dummy_state/chk-4 does not exist}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
Caused by: java.io.FileNotFoundException: File file:/data/flink/checkpoints/471ef8996921bb9c29434abf35490a26/StreamMap_12_0/dummy_state/chk-4 does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.hadoop.fs.FileSystem.getContentSummary(FileSystem.java:1467)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend$FinalSemiAsyncSnapshot.getStateSize(RocksDBStateBackend.java:688)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:89)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:860)



Abiy Hailemichael
Software Engineer
Phone: <a href="tel:(202)%20355-8933" value="+12023558933" target="_blank">(202) 355-8933
Email: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re:

Fabian Hueske-2
This issue was posted twice to the ML.

The discussion should be continued on the other thread with the subject "Stateful Stream Processing with RocksDB causing Job failure"

2016-12-21 9:44 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi Abiy,

to which type of filesystem are you persisting your checkpoints?

We have seen problems with S3 and its consistency model. These issues have been addressed in newer versions of Flink.
Not sure if the fix went into 1.1.3 already but release 1.1.4 is currently voted on and has tons of other bug fixes as well.
I would suggest to upgrade to 1.1.3 or even 1.1.4 once it is released (should happen in a few days if no regression is found).

Best, Fabian

2016-12-20 21:19 GMT+01:00 Abiy Legesse Hailemichael <[hidden email]>:
I am running a standalone flink cluster (1.1.2) and I have a stateful streaming job that uses RocksDB as a state manager. I have two stateful operators that are using ValueState<> and ListState<>. Every now and then my job fails with the following exception

Caused by: AsynchronousException{java.io.FileNotFoundException: File file:/data/flink/checkpoints/471ef8996921bb9c29434abf35490a26/StreamMap_12_0/dummy_state/chk-4 does not exist}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
Caused by: java.io.FileNotFoundException: File file:/data/flink/checkpoints/471ef8996921bb9c29434abf35490a26/StreamMap_12_0/dummy_state/chk-4 does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.hadoop.fs.FileSystem.getContentSummary(FileSystem.java:1467)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend$FinalSemiAsyncSnapshot.getStateSize(RocksDBStateBackend.java:688)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:89)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:860)



Abiy Hailemichael
Software Engineer
Phone: <a href="tel:(202)%20355-8933" value="+12023558933" target="_blank">(202) 355-8933
Email: [hidden email]