sink with BucketingSink to S3 files override

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

sink with BucketingSink to S3 files override

galantaa
Hey,
I have some kind of a concurrency problem with Bucketing sink when I write
to S3.
I use the AvroKeyValueSinkWriter.
The problem is that when I send events the suppose to be written to the same
directory, but to a different part file (because of different event types),
the files override each other.
The problem occurs only when I sink the files to S3.
When I write the files to the local storage it does not happen, but I think
that only because there's this loop in openNewPartFile:

// The following loop tries different partCounter values in ascending order
until it reaches the minimum
// that is not yet used. This works since there is only one parallel subtask
that tries names with this
// subtask id. Otherwise we would run into concurrency issues here. This is
aligned with the way we now
// clean the base directory in case of rescaling.

/int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
bucketState.partCounter);
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
        bucketState.partCounter++;
        partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
bucketState.partCounter);
                }
/
that makes sense. But on S3 the files does not exist until checkpointing, so
the loop won't find the files.

After debugging, I've noticed that in the invoke method, in
state.getBucketState() the first time I try to write event to the bucket, it
creates a new bucketState in the HashMap, but the second time I try to write
to the same bucket (with the different event), it does find this new
bucketState.

Thanks for the help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: sink with BucketingSink to S3 files override

Aljoscha Krettek
Hi,

I'm afraid the BucketingSink does not work well with S3 because of the eventually-consistent nature of S3. As you noticed in the code snipped you sent, the sink relies on the fact that directory listings are accurate, which is not the case with S3.

The Flink community is aware of this problem and it's one of the top priorities for the next release after Flink 1.5.

Best,
Aljoscha

> On 19. Feb 2018, at 17:39, galantaa <[hidden email]> wrote:
>
> Hey,
> I have some kind of a concurrency problem with Bucketing sink when I write
> to S3.
> I use the AvroKeyValueSinkWriter.
> The problem is that when I send events the suppose to be written to the same
> directory, but to a different part file (because of different event types),
> the files override each other.
> The problem occurs only when I sink the files to S3.
> When I write the files to the local storage it does not happen, but I think
> that only because there's this loop in openNewPartFile:
>
> // The following loop tries different partCounter values in ascending order
> until it reaches the minimum
> // that is not yet used. This works since there is only one parallel subtask
> that tries names with this
> // subtask id. Otherwise we would run into concurrency issues here. This is
> aligned with the way we now
> // clean the base directory in case of rescaling.
>
> /int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>  fs.exists(getPendingPathFor(partPath)) ||
>  fs.exists(getInProgressPathFor(partPath))) {
> bucketState.partCounter++;
> partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" +
> bucketState.partCounter);
> }
> /
> that makes sense. But on S3 the files does not exist until checkpointing, so
> the loop won't find the files.
>
> After debugging, I've noticed that in the invoke method, in
> state.getBucketState() the first time I try to write event to the bucket, it
> creates a new bucketState in the HashMap, but the second time I try to write
> to the same bucket (with the different event), it does find this new
> bucketState.
>
> Thanks for the help!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/