Hey,
I have some kind of a concurrency problem with Bucketing sink when I write to S3. I use the AvroKeyValueSinkWriter. The problem is that when I send events the suppose to be written to the same directory, but to a different part file (because of different event types), the files override each other. The problem occurs only when I sink the files to S3. When I write the files to the local storage it does not happen, but I think that only because there's this loop in openNewPartFile: // The following loop tries different partCounter values in ascending order until it reaches the minimum // that is not yet used. This works since there is only one parallel subtask that tries names with this // subtask id. Otherwise we would run into concurrency issues here. This is aligned with the way we now // clean the base directory in case of rescaling. /int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); while (fs.exists(partPath) || fs.exists(getPendingPathFor(partPath)) || fs.exists(getInProgressPathFor(partPath))) { bucketState.partCounter++; partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); } / that makes sense. But on S3 the files does not exist until checkpointing, so the loop won't find the files. After debugging, I've noticed that in the invoke method, in state.getBucketState() the first time I try to write event to the bucket, it creates a new bucketState in the HashMap, but the second time I try to write to the same bucket (with the different event), it does find this new bucketState. Thanks for the help! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I'm afraid the BucketingSink does not work well with S3 because of the eventually-consistent nature of S3. As you noticed in the code snipped you sent, the sink relies on the fact that directory listings are accurate, which is not the case with S3. The Flink community is aware of this problem and it's one of the top priorities for the next release after Flink 1.5. Best, Aljoscha > On 19. Feb 2018, at 17:39, galantaa <[hidden email]> wrote: > > Hey, > I have some kind of a concurrency problem with Bucketing sink when I write > to S3. > I use the AvroKeyValueSinkWriter. > The problem is that when I send events the suppose to be written to the same > directory, but to a different part file (because of different event types), > the files override each other. > The problem occurs only when I sink the files to S3. > When I write the files to the local storage it does not happen, but I think > that only because there's this loop in openNewPartFile: > > // The following loop tries different partCounter values in ascending order > until it reaches the minimum > // that is not yet used. This works since there is only one parallel subtask > that tries names with this > // subtask id. Otherwise we would run into concurrency issues here. This is > aligned with the way we now > // clean the base directory in case of rescaling. > > /int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); > Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > while (fs.exists(partPath) || > fs.exists(getPendingPathFor(partPath)) || > fs.exists(getInProgressPathFor(partPath))) { > bucketState.partCounter++; > partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > } > / > that makes sense. But on S3 the files does not exist until checkpointing, so > the loop won't find the files. > > After debugging, I've noticed that in the invoke method, in > state.getBucketState() the first time I try to write event to the bucket, it > creates a new bucketState in the HashMap, but the second time I try to write > to the same bucket (with the different event), it does find this new > bucketState. > > Thanks for the help! > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |