Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.
During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. So, when Flink tries to move it into final state, we have a FileAlreadyExistsException. This problem is related with the following code: Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); while (fs.exists(partPath) || I’ll create an issue and try to submit a fix Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 and added a proposal with PR.
Thx
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
Hi mates, could anyone please have a look on my PR, that fixes issue of incorrect indexing in BucketingSink component ?
Thx
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
In reply to this post by Rinat
Hi, Rinat I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look. In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending, _part-0-685PartSuffix.pending and so on since checkpoint does not finished. Cheers Minglei. public class TestSuffix {
|
Hi Mingey !
Thx for your reply, really, have no idea why everything works in your case, I have implemented unit tests in my PR which shows, that problem exists. Please, let me know which Flink version do you use ? Current fix is actual for current master branch, here it an example of unit test, that shows the problem @Test
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
Hi Mingey !
I’ve implemented the group of tests, that shows that problem exists only when part suffix is specified and file in pending state exists here is an exception testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest) Time elapsed: 0.018 sec <<< ERROR! java.io.IOException: File already exists: /var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzm0000gn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71) at org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909) You could add the following test to the org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class @Test//(expected = IOException.class) And check, that test fails it’s actual for the current master branch, also I’ve implemented a PR, that fixes this problem (https://github.com/apache/flink/pull/6176) For some reasons, I still couldn’t compile the whole flink repository, to run your example locally from IDE, but from my point of view, problem exists, and the following test shows it’s existance, please, have a look I’m working on flink project assembly on my local machine … Thx
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
Free forum by Nabble | Edit this page |