[BucketingSink] incorrect indexing of part files, when part suffix is specified

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[BucketingSink] incorrect indexing of part files, when part suffix is specified

Rinat
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

Rinat
I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 and added a proposal with PR.

Thx

On 16 Jun 2018, at 17:21, Rinat <[hidden email]> wrote:

Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

Rinat
Hi mates, could anyone please have a look on my PR, that fixes issue of incorrect indexing in BucketingSink component ?

Thx

On 18 Jun 2018, at 10:55, Rinat <[hidden email]> wrote:

I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 and added a proposal with PR.

Thx

On 16 Jun 2018, at 17:21, Rinat <[hidden email]> wrote:

Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

zhangminglei
In reply to this post by Rinat
Hi, Rinat

I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending,  _part-0-685PartSuffix.pending and so on since checkpoint does not finished.

Cheers
Minglei.

public class TestSuffix {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
sEnv.enableCheckpointing(200);
sEnv.setParallelism(1);

BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
.setInactiveBucketThreshold(1000)
.setInactiveBucketCheckInterval(1000)
.setPartSuffix("PartSuffix")
.setBatchSize(500);

sEnv.addSource(new DataGenerator())
.keyBy(0)
.map(new CountUpRichMap())
.addSink(sink);

sEnv.execute();
}

public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, String, Integer>> {

private ValueState<Integer> counter;

@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT));
}

@Override
public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, String> value) throws Exception {
Integer counterValue = counter.value();
if (counterValue == null) {
counterValue = 0;
}
counter.update(counterValue + 1);
return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
}
}

public static class DataGenerator implements SourceFunction<Tuple3<Integer, String, String>> {

public DataGenerator() {
}

@Override
public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some payloads......"));
}
}

@Override
public void cancel() {

}
}
}




在 2018年6月16日,下午10:21,Rinat <[hidden email]> 写道:

Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Reply | Threaded
Open this post in threaded view
|

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

Rinat
Hi Mingey !

Thx for your reply, really, have no idea why everything works in your case, I have implemented unit tests in my PR which shows, that problem exists. Please, let me know which Flink version do you use ?
Current fix is actual for current master branch, here it an example of unit test, that shows the problem

@Test
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws Exception {
String partSuffix = ".my";

File outDir = tempFolder.newFolder();
long inactivityInterval = 100;

java.nio.file.Path bucket = Paths.get(outDir.getPath());
Files.createFile(bucket.resolve("part-0-0.my.pending"));

String basePath = outDir.getAbsolutePath();
BucketingSink<String> sink = new BucketingSink<String>(basePath)
.setBucketer(new BasePathBucketer<>())
.setInactiveBucketCheckInterval(inactivityInterval)
.setInactiveBucketThreshold(inactivityInterval)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
.setPartSuffix(partSuffix)
.setBatchSize(0);

OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
testHarness.setup();
testHarness.open();

testHarness.setProcessingTime(0L);

testHarness.processElement(new StreamRecord<>("test1", 1L));

testHarness.setProcessingTime(101L);
testHarness.snapshot(0, 0);
testHarness.notifyOfCompletedCheckpoint(0);
sink.close();

assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
}

On 24 Jun 2018, at 06:02, zhangminglei <[hidden email]> wrote:

Hi, Rinat

I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending,  _part-0-685PartSuffix.pending and so on since checkpoint does not finished.

Cheers
Minglei.

public class TestSuffix {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
sEnv.enableCheckpointing(200);
sEnv.setParallelism(1);

BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
.setInactiveBucketThreshold(1000)
.setInactiveBucketCheckInterval(1000)
.setPartSuffix("PartSuffix")
.setBatchSize(500);

sEnv.addSource(new DataGenerator())
.keyBy(0)
.map(new CountUpRichMap())
.addSink(sink);

sEnv.execute();
}

public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, String, Integer>> {

private ValueState<Integer> counter;

@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT));
}

@Override
public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, String> value) throws Exception {
Integer counterValue = counter.value();
if (counterValue == null) {
counterValue = 0;
}
counter.update(counterValue + 1);
return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
}
}

public static class DataGenerator implements SourceFunction<Tuple3<Integer, String, String>> {

public DataGenerator() {
}

@Override
public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some payloads......"));
}
}

@Override
public void cancel() {

}
}
}




在 2018年6月16日,下午10:21,Rinat <[hidden email]> 写道:

Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

Rinat
Hi Mingey !

I’ve implemented the group of tests, that shows that problem exists only when part suffix is specified and file in pending state exists

here is an exception

testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest)  Time elapsed: 0.018 sec  <<< ERROR!
java.io.IOException: File already exists: /var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzm0000gn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71)
at org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909)


You could add the following test to the org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class

    @Test//(expected = IOException.class)
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
throws Exception {
testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
}

private void testThatPartIndexIsIncremented(String partSuffix, String existingPartFile) throws Exception {
File outDir = tempFolder.newFolder();
long inactivityInterval = 100;

java.nio.file.Path bucket = Paths.get(outDir.getPath());
Files.createFile(bucket.resolve(existingPartFile));

String basePath = outDir.getAbsolutePath();
BucketingSink<String> sink = new BucketingSink<String>(basePath)
.setBucketer(new BasePathBucketer<>())
.setInactiveBucketCheckInterval(inactivityInterval)
.setInactiveBucketThreshold(inactivityInterval)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
.setPartSuffix(partSuffix)
.setBatchSize(0);

OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
testHarness.setup();
testHarness.open();

testHarness.setProcessingTime(0L);

testHarness.processElement(new StreamRecord<>("test1", 1L));

testHarness.setProcessingTime(101L);
testHarness.snapshot(0, 0);
testHarness.notifyOfCompletedCheckpoint(0);
sink.close();

String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + partSuffix;
// assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
}

And check, that test fails

it’s actual for the current master branch, also I’ve implemented a PR, that fixes this problem (https://github.com/apache/flink/pull/6176)

For some reasons, I still couldn’t compile the whole flink repository, to run your example locally from IDE, but from my point of view, problem exists, and the following test shows it’s existance, please, have a look

I’m working on flink project assembly on my local machine …

Thx


On 25 Jun 2018, at 10:44, Rinat <[hidden email]> wrote:

Hi Mingey !

Thx for your reply, really, have no idea why everything works in your case, I have implemented unit tests in my PR which shows, that problem exists. Please, let me know which Flink version do you use ?
Current fix is actual for current master branch, here it an example of unit test, that shows the problem

@Test
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws Exception {
String partSuffix = ".my";

File outDir = tempFolder.newFolder();
long inactivityInterval = 100;

java.nio.file.Path bucket = Paths.get(outDir.getPath());
Files.createFile(bucket.resolve("part-0-0.my.pending"));

String basePath = outDir.getAbsolutePath();
BucketingSink<String> sink = new BucketingSink<String>(basePath)
.setBucketer(new BasePathBucketer<>())
.setInactiveBucketCheckInterval(inactivityInterval)
.setInactiveBucketThreshold(inactivityInterval)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
.setPartSuffix(partSuffix)
.setBatchSize(0);

OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
testHarness.setup();
testHarness.open();

testHarness.setProcessingTime(0L);

testHarness.processElement(new StreamRecord<>("test1", 1L));

testHarness.setProcessingTime(101L);
testHarness.snapshot(0, 0);
testHarness.notifyOfCompletedCheckpoint(0);
sink.close();

assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
}

On 24 Jun 2018, at 06:02, zhangminglei <[hidden email]> wrote:

Hi, Rinat

I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending,  _part-0-685PartSuffix.pending and so on since checkpoint does not finished.

Cheers
Minglei.

public class TestSuffix {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
sEnv.enableCheckpointing(200);
sEnv.setParallelism(1);

BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
.setInactiveBucketThreshold(1000)
.setInactiveBucketCheckInterval(1000)
.setPartSuffix("PartSuffix")
.setBatchSize(500);

sEnv.addSource(new DataGenerator())
.keyBy(0)
.map(new CountUpRichMap())
.addSink(sink);

sEnv.execute();
}

public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, String, Integer>> {

private ValueState<Integer> counter;

@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT));
}

@Override
public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, String> value) throws Exception {
Integer counterValue = counter.value();
if (counterValue == null) {
counterValue = 0;
}
counter.update(counterValue + 1);
return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
}
}

public static class DataGenerator implements SourceFunction<Tuple3<Integer, String, String>> {

public DataGenerator() {
}

@Override
public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some payloads......"));
}
}

@Override
public void cancel() {

}
}
}




在 2018年6月16日,下午10:21,Rinat <[hidden email]> 写道:

Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever