rocksdb backend on s3 window operator checkpoint issue

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

rocksdb backend on s3 window operator checkpoint issue

Chen Qin
Hi there,

I have been testing checkpointing on rocksdb backed by s3. Checkpoints seems successful except snapshot states of timeWindow operator on keyedStream. Here is the env setting I used 
env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))

The checkpoint for always fail consistently when it goes to window operator snapshotting. Exception log attached below.
I tried to env.setStateBackend(new RocksDBStateBackend(new URI("file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue with checkpoints.

Does anyone saw this issue before? Or did I mess up with configuration?

Thanks,
Chen

2016-05-16 17:20:32,132 INFO  org.apache.flink.runtime.state.filesystem.FsStateBackend      - Initializing file state backend to URI s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:20:32,423 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using user-defined state backend: org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
2016-05-16 17:20:32,423 INFO  org.apache.flink.runtime.state.filesystem.FsStateBackend      - Initializing file state backend to URI s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:21:31,423 INFO  org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066) backup (synchronous part) took 8 ms.
2016-05-16 17:21:36,125 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599 (Is a directory)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
...

Tests look like
.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
@Override
public Watermark checkAndGetNextWatermark(String s, long l) {
long ts = System.currentTimeMillis() - 60*1000l;
return new Watermark(ts);
}

@Override
public long extractTimestamp(String s, long l) {
long ts = System.currentTimeMillis();
return ts;
}
}).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
collector.collect(new Tuple2<>(s, 1l));
}
}).keyBy(0).timeWindow(Time.seconds(60)).apply(new RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {

@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
log.info("trigger fire at ", System.currentTimeMillis());
collector.collect(new Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
}
}).rebalance().addSink(new FakeSink<>());

JobExecutionResult result = env.execute();



Reply | Threaded
Open this post in threaded view
|

Re: rocksdb backend on s3 window operator checkpoint issue

rmetzger0
Hi,

from the code you've provided, everything seems to look okay. I'm currently trying to reproduce the issue.
Which Flink version are you using?

Which s3 implementation did you configure in the hadoop configuration?

Regards,
Robert


On Mon, May 16, 2016 at 11:52 PM, Chen Qin <[hidden email]> wrote:
Hi there,

I have been testing checkpointing on rocksdb backed by s3. Checkpoints seems successful except snapshot states of timeWindow operator on keyedStream. Here is the env setting I used 
env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))

The checkpoint for always fail consistently when it goes to window operator snapshotting. Exception log attached below.
I tried to env.setStateBackend(new RocksDBStateBackend(new URI("file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue with checkpoints.

Does anyone saw this issue before? Or did I mess up with configuration?

Thanks,
Chen

2016-05-16 17:20:32,132 INFO  org.apache.flink.runtime.state.filesystem.FsStateBackend      - Initializing file state backend to URI s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:20:32,423 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using user-defined state backend: org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
2016-05-16 17:20:32,423 INFO  org.apache.flink.runtime.state.filesystem.FsStateBackend      - Initializing file state backend to URI s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:21:31,423 INFO  org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066) backup (synchronous part) took 8 ms.
2016-05-16 17:21:36,125 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599 (Is a directory)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
...

Tests look like
.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
@Override
public Watermark checkAndGetNextWatermark(String s, long l) {
long ts = System.currentTimeMillis() - 60*1000l;
return new Watermark(ts);
}

@Override
public long extractTimestamp(String s, long l) {
long ts = System.currentTimeMillis();
return ts;
}
}).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
collector.collect(new Tuple2<>(s, 1l));
}
}).keyBy(0).timeWindow(Time.seconds(60)).apply(new RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {

@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
log.info("trigger fire at ", System.currentTimeMillis());
collector.collect(new Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
}
}).rebalance().addSink(new FakeSink<>());

JobExecutionResult result = env.execute();




Reply | Threaded
Open this post in threaded view
|

Re: rocksdb backend on s3 window operator checkpoint issue

rmetzger0
I tried reproducing the issue using the org.apache.hadoop.fs.s3a.S3AFileSystem and it worked. 
I had some dependency issues with the S3AFileSystem so I didn't follow that path for now. If you've used the S3AFileSystem, I can try to get that one working as well.

On Tue, May 17, 2016 at 11:59 AM, Robert Metzger <[hidden email]> wrote:
Hi,

from the code you've provided, everything seems to look okay. I'm currently trying to reproduce the issue.
Which Flink version are you using?

Which s3 implementation did you configure in the hadoop configuration?

Regards,
Robert


On Mon, May 16, 2016 at 11:52 PM, Chen Qin <[hidden email]> wrote:
Hi there,

I have been testing checkpointing on rocksdb backed by s3. Checkpoints seems successful except snapshot states of timeWindow operator on keyedStream. Here is the env setting I used 
env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))

The checkpoint for always fail consistently when it goes to window operator snapshotting. Exception log attached below.
I tried to env.setStateBackend(new RocksDBStateBackend(new URI("file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue with checkpoints.

Does anyone saw this issue before? Or did I mess up with configuration?

Thanks,
Chen

2016-05-16 17:20:32,132 INFO  org.apache.flink.runtime.state.filesystem.FsStateBackend      - Initializing file state backend to URI s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:20:32,423 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using user-defined state backend: org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
2016-05-16 17:20:32,423 INFO  org.apache.flink.runtime.state.filesystem.FsStateBackend      - Initializing file state backend to URI s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:21:31,423 INFO  org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB (/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066) backup (synchronous part) took 8 ms.
2016-05-16 17:21:36,125 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599 (Is a directory)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
...

Tests look like
.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
@Override
public Watermark checkAndGetNextWatermark(String s, long l) {
long ts = System.currentTimeMillis() - 60*1000l;
return new Watermark(ts);
}

@Override
public long extractTimestamp(String s, long l) {
long ts = System.currentTimeMillis();
return ts;
}
}).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
collector.collect(new Tuple2<>(s, 1l));
}
}).keyBy(0).timeWindow(Time.seconds(60)).apply(new RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {

@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
log.info("trigger fire at ", System.currentTimeMillis());
collector.collect(new Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
}
}).rebalance().addSink(new FakeSink<>());

JobExecutionResult result = env.execute();