StreamingFileSink seems to be overwriting existing part files

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink seems to be overwriting existing part files

Bruno Aranda
Hi,

One of the main reasons we moved to version 1.7 (and 1.7.2 in particular) was because of the possibility of using a StreamingFileSink with S3.

We've configured a StreamingFileSink to use a DateTimeBucketAssigner to bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR cluster in AWS.

We ran the job and after a few hours of activity, manually cancelled it through the jobmanager API. After confirming that a number of "part-0-x" files existed in S3 at the expected path, we then started the job again using the same invocation of the CLI "flink run..." command that was originally used to start it.

It started writing data to S3 again, starting afresh from "part-0-0", which gradually overwrote the existing data.

I can understand not having used a checkpoint gives no indication on where to resume, but the fact that it overwrites the existing files (as it starts to write to part-0.0 again) is surprising. One would expect that it finds the last part and gets the next free number?

We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the presto version on the classpath.

Is this its expected behaviour? We have not seen this in the non streaming versions of the sink.

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink seems to be overwriting existing part files

Kostas Kloudas-2
Hi Bruno,

This is the expected behaviour as the job starts "fresh", given that you did not specify any savepoint/checkpoint to start from.

As for the note that "One would expect that it finds the last part and gets the next free number?", 
I am not sure how this can be achieved safely and efficiently in an eventually consistent object store like s3.
This is actually the reason why, contrary to the BucketingSink, the StreamingFileSink relies on Flink's own state to determine the "next" part counter.

Cheers,
Kostas

On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda <[hidden email]> wrote:
Hi,

One of the main reasons we moved to version 1.7 (and 1.7.2 in particular) was because of the possibility of using a StreamingFileSink with S3.

We've configured a StreamingFileSink to use a DateTimeBucketAssigner to bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR cluster in AWS.

We ran the job and after a few hours of activity, manually cancelled it through the jobmanager API. After confirming that a number of "part-0-x" files existed in S3 at the expected path, we then started the job again using the same invocation of the CLI "flink run..." command that was originally used to start it.

It started writing data to S3 again, starting afresh from "part-0-0", which gradually overwrote the existing data.

I can understand not having used a checkpoint gives no indication on where to resume, but the fact that it overwrites the existing files (as it starts to write to part-0.0 again) is surprising. One would expect that it finds the last part and gets the next free number?

We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the presto version on the classpath.

Is this its expected behaviour? We have not seen this in the non streaming versions of the sink.

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink seems to be overwriting existing part files

Bruno Aranda
Hi Kostas,

Put that way, sounds fair enough. Many thanks for the clarification,

Cheers,

Bruno

On Fri, 29 Mar 2019 at 15:32, Kostas Kloudas <[hidden email]> wrote:
Hi Bruno,

This is the expected behaviour as the job starts "fresh", given that you did not specify any savepoint/checkpoint to start from.

As for the note that "One would expect that it finds the last part and gets the next free number?", 
I am not sure how this can be achieved safely and efficiently in an eventually consistent object store like s3.
This is actually the reason why, contrary to the BucketingSink, the StreamingFileSink relies on Flink's own state to determine the "next" part counter.

Cheers,
Kostas

On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda <[hidden email]> wrote:
Hi,

One of the main reasons we moved to version 1.7 (and 1.7.2 in particular) was because of the possibility of using a StreamingFileSink with S3.

We've configured a StreamingFileSink to use a DateTimeBucketAssigner to bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR cluster in AWS.

We ran the job and after a few hours of activity, manually cancelled it through the jobmanager API. After confirming that a number of "part-0-x" files existed in S3 at the expected path, we then started the job again using the same invocation of the CLI "flink run..." command that was originally used to start it.

It started writing data to S3 again, starting afresh from "part-0-0", which gradually overwrote the existing data.

I can understand not having used a checkpoint gives no indication on where to resume, but the fact that it overwrites the existing files (as it starts to write to part-0.0 again) is surprising. One would expect that it finds the last part and gets the next free number?

We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the presto version on the classpath.

Is this its expected behaviour? We have not seen this in the non streaming versions of the sink.

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink seems to be overwriting existing part files

Kostas Kloudas-2
No problem!

Cheers,
Kostas

On Fri, Mar 29, 2019 at 4:38 PM Bruno Aranda <[hidden email]> wrote:
Hi Kostas,

Put that way, sounds fair enough. Many thanks for the clarification,

Cheers,

Bruno

On Fri, 29 Mar 2019 at 15:32, Kostas Kloudas <[hidden email]> wrote:
Hi Bruno,

This is the expected behaviour as the job starts "fresh", given that you did not specify any savepoint/checkpoint to start from.

As for the note that "One would expect that it finds the last part and gets the next free number?", 
I am not sure how this can be achieved safely and efficiently in an eventually consistent object store like s3.
This is actually the reason why, contrary to the BucketingSink, the StreamingFileSink relies on Flink's own state to determine the "next" part counter.

Cheers,
Kostas

On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda <[hidden email]> wrote:
Hi,

One of the main reasons we moved to version 1.7 (and 1.7.2 in particular) was because of the possibility of using a StreamingFileSink with S3.

We've configured a StreamingFileSink to use a DateTimeBucketAssigner to bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR cluster in AWS.

We ran the job and after a few hours of activity, manually cancelled it through the jobmanager API. After confirming that a number of "part-0-x" files existed in S3 at the expected path, we then started the job again using the same invocation of the CLI "flink run..." command that was originally used to start it.

It started writing data to S3 again, starting afresh from "part-0-0", which gradually overwrote the existing data.

I can understand not having used a checkpoint gives no indication on where to resume, but the fact that it overwrites the existing files (as it starts to write to part-0.0 again) is surprising. One would expect that it finds the last part and gets the next free number?

We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the presto version on the classpath.

Is this its expected behaviour? We have not seen this in the non streaming versions of the sink.

Best regards,

Bruno