StreamingFileSink doesn't close multipart uploads to s3?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink doesn't close multipart uploads to s3?

Li Peng-2
Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipart upload to s3).

But I'm not actually seeing files written to s3 at all, instead I see a bunch of open multipart uploads when I check the AWS s3 console, for example:

 "Uploads": [
        {
            "Initiated": "2019-12-06T20:57:47.000Z",
            "Key": "2019-12-06--20/part-0-0"
        },
        {
            "Initiated": "2019-12-06T20:57:47.000Z",
            "Key": "2019-12-06--20/part-1-0"
        },
        {
            "Initiated": "2019-12-06T21:03:12.000Z",
            "Key": "2019-12-06--21/part-0-1"
        },
        {
            "Initiated": "2019-12-06T21:04:15.000Z",
            "Key": "2019-12-06--21/part-0-2"
        },
        {
            "Initiated": "2019-12-06T21:22:23.000Z"
            "Key": "2019-12-06--21/part-0-3"
        }
]

And these uploads are being open for a long time. So far after an hour, none of the uploads have been closed. Is this the expected behavior? If I wanted to get these uploads to actually write to s3 quickly, do I need to configure the hadoop stuff to get that done, like setting a smaller buffer/partition size to force it to upload

Thanks,
Li
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink doesn't close multipart uploads to s3?

Li Peng-2
Ok I seem to have solved the issue by enabling checkpointing. Based on the docs (I'm using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat() should've required checkpointing, but based on this experience, StreamingFileSink.forRowFormat() requires it too! Is this the intended behavior? If so, the docs should probably be updated.

Thanks,
Li

On Fri, Dec 6, 2019 at 2:01 PM Li Peng <[hidden email]> wrote:
Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipart upload to s3).

But I'm not actually seeing files written to s3 at all, instead I see a bunch of open multipart uploads when I check the AWS s3 console, for example:

 "Uploads": [
        {
            "Initiated": "2019-12-06T20:57:47.000Z",
            "Key": "2019-12-06--20/part-0-0"
        },
        {
            "Initiated": "2019-12-06T20:57:47.000Z",
            "Key": "2019-12-06--20/part-1-0"
        },
        {
            "Initiated": "2019-12-06T21:03:12.000Z",
            "Key": "2019-12-06--21/part-0-1"
        },
        {
            "Initiated": "2019-12-06T21:04:15.000Z",
            "Key": "2019-12-06--21/part-0-2"
        },
        {
            "Initiated": "2019-12-06T21:22:23.000Z"
            "Key": "2019-12-06--21/part-0-3"
        }
]

And these uploads are being open for a long time. So far after an hour, none of the uploads have been closed. Is this the expected behavior? If I wanted to get these uploads to actually write to s3 quickly, do I need to configure the hadoop stuff to get that done, like setting a smaller buffer/partition size to force it to upload

Thanks,
Li
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink doesn't close multipart uploads to s3?

Kostas Kloudas-2
Hi Li,

This is the expected behavior. All the "exactly-once" sinks in Flink
require checkpointing to be enabled.
We will update the documentation to be clearer in the upcoming release.

Thanks a lot,
Kostas

On Sat, Dec 7, 2019 at 3:47 AM Li Peng <[hidden email]> wrote:

>
> Ok I seem to have solved the issue by enabling checkpointing. Based on the docs (I'm using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat() should've required checkpointing, but based on this experience, StreamingFileSink.forRowFormat() requires it too! Is this the intended behavior? If so, the docs should probably be updated.
>
> Thanks,
> Li
>
> On Fri, Dec 6, 2019 at 2:01 PM Li Peng <[hidden email]> wrote:
>>
>> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipart upload to s3).
>>
>> But I'm not actually seeing files written to s3 at all, instead I see a bunch of open multipart uploads when I check the AWS s3 console, for example:
>>
>>  "Uploads": [
>>         {
>>             "Initiated": "2019-12-06T20:57:47.000Z",
>>             "Key": "2019-12-06--20/part-0-0"
>>         },
>>         {
>>             "Initiated": "2019-12-06T20:57:47.000Z",
>>             "Key": "2019-12-06--20/part-1-0"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:03:12.000Z",
>>             "Key": "2019-12-06--21/part-0-1"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:04:15.000Z",
>>             "Key": "2019-12-06--21/part-0-2"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:22:23.000Z"
>>             "Key": "2019-12-06--21/part-0-3"
>>         }
>> ]
>>
>> And these uploads are being open for a long time. So far after an hour, none of the uploads have been closed. Is this the expected behavior? If I wanted to get these uploads to actually write to s3 quickly, do I need to configure the hadoop stuff to get that done, like setting a smaller buffer/partition size to force it to upload?
>>
>> Thanks,
>> Li
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink doesn't close multipart uploads to s3?

Jingsong Li
Hi Kostas,

I  took a look to StreamingFileSink.close, it just delete all temporary files. I know it is for failover. When Job fail, it should just delete temp files for next restart.
But for testing purposes, we just want to run a bounded streaming job. If there is no checkpoint trigger, no one will move the final temp files to output path, so the result of this job is wrong.
Do you have any idea about this? Can we distinguish "fail close" from "success finish close" in StreamingFileSink?

Best,
Jingsong Lee

On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas <[hidden email]> wrote:
Hi Li,

This is the expected behavior. All the "exactly-once" sinks in Flink
require checkpointing to be enabled.
We will update the documentation to be clearer in the upcoming release.

Thanks a lot,
Kostas

On Sat, Dec 7, 2019 at 3:47 AM Li Peng <[hidden email]> wrote:
>
> Ok I seem to have solved the issue by enabling checkpointing. Based on the docs (I'm using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat() should've required checkpointing, but based on this experience, StreamingFileSink.forRowFormat() requires it too! Is this the intended behavior? If so, the docs should probably be updated.
>
> Thanks,
> Li
>
> On Fri, Dec 6, 2019 at 2:01 PM Li Peng <[hidden email]> wrote:
>>
>> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipart upload to s3).
>>
>> But I'm not actually seeing files written to s3 at all, instead I see a bunch of open multipart uploads when I check the AWS s3 console, for example:
>>
>>  "Uploads": [
>>         {
>>             "Initiated": "2019-12-06T20:57:47.000Z",
>>             "Key": "2019-12-06--20/part-0-0"
>>         },
>>         {
>>             "Initiated": "2019-12-06T20:57:47.000Z",
>>             "Key": "2019-12-06--20/part-1-0"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:03:12.000Z",
>>             "Key": "2019-12-06--21/part-0-1"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:04:15.000Z",
>>             "Key": "2019-12-06--21/part-0-2"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:22:23.000Z"
>>             "Key": "2019-12-06--21/part-0-3"
>>         }
>> ]
>>
>> And these uploads are being open for a long time. So far after an hour, none of the uploads have been closed. Is this the expected behavior? If I wanted to get these uploads to actually write to s3 quickly, do I need to configure the hadoop stuff to get that done, like setting a smaller buffer/partition size to force it to upload?
>>
>> Thanks,
>> Li


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink doesn't close multipart uploads to s3?

Ken Krugler
Hi Kostas,

I didn’t see a follow-up to this, and have also run into this same issue of winding up with a bunch of .inprogress files when a bounded input stream ends and the job terminates.

When StreamingFileSystem.close() is called, shouldn’t all buckets get auto-rolled, so that the .inprogress files become part-xxx files?

Thanks,

— Ken


On Dec 9, 2019, at 6:56 PM, Jingsong Li <[hidden email]> wrote:

Hi Kostas,

I  took a look to StreamingFileSink.close, it just delete all temporary files. I know it is for failover. When Job fail, it should just delete temp files for next restart.
But for testing purposes, we just want to run a bounded streaming job. If there is no checkpoint trigger, no one will move the final temp files to output path, so the result of this job is wrong.
Do you have any idea about this? Can we distinguish "fail close" from "success finish close" in StreamingFileSink?

Best,
Jingsong Lee

On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas <[hidden email]> wrote:
Hi Li,

This is the expected behavior. All the "exactly-once" sinks in Flink
require checkpointing to be enabled.
We will update the documentation to be clearer in the upcoming release.

Thanks a lot,
Kostas

On Sat, Dec 7, 2019 at 3:47 AM Li Peng <[hidden email]> wrote:
>
> Ok I seem to have solved the issue by enabling checkpointing. Based on the docs (I'm using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat() should've required checkpointing, but based on this experience, StreamingFileSink.forRowFormat() requires it too! Is this the intended behavior? If so, the docs should probably be updated.
>
> Thanks,
> Li
>
> On Fri, Dec 6, 2019 at 2:01 PM Li Peng <[hidden email]> wrote:
>>
>> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipart upload to s3).
>>
>> But I'm not actually seeing files written to s3 at all, instead I see a bunch of open multipart uploads when I check the AWS s3 console, for example:
>>
>>  "Uploads": [
>>         {
>>             "Initiated": "2019-12-06T20:57:47.000Z",
>>             "Key": "2019-12-06--20/part-0-0"
>>         },
>>         {
>>             "Initiated": "2019-12-06T20:57:47.000Z",
>>             "Key": "2019-12-06--20/part-1-0"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:03:12.000Z",
>>             "Key": "2019-12-06--21/part-0-1"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:04:15.000Z",
>>             "Key": "2019-12-06--21/part-0-2"
>>         },
>>         {
>>             "Initiated": "2019-12-06T21:22:23.000Z"
>>             "Key": "2019-12-06--21/part-0-3"
>>         }
>> ]
>>
>> And these uploads are being open for a long time. So far after an hour, none of the uploads have been closed. Is this the expected behavior? If I wanted to get these uploads to actually write to s3 quickly, do I need to configure the hadoop stuff to get that done, like setting a smaller buffer/partition size to force it to upload?
>>
>> Thanks,
>> Li


--
Best, Jingsong Lee

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink doesn't close multipart uploads to s3?

Kostas Kloudas-2
Hi Ken, Jingsong and Li,

Sorry for the late reply.

As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() method of any UDF including
sink functions is called on both normal termination and termination
due to failure.
Given this, we cannot commit the files, because in case of failure
they should be reverted.

Actually we are currently updating the StreamingFileSink docs to
includes this among other things.
Also the differentiation between normal termination and termination
due to failure will hopefully be part of Flink 1.11 and
this is the FLIP to check
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs.

Cheers,
Kostas

On Fri, Jan 10, 2020 at 4:45 PM Ken Krugler <[hidden email]> wrote:

>
> Hi Kostas,
>
> I didn’t see a follow-up to this, and have also run into this same issue of winding up with a bunch of .inprogress files when a bounded input stream ends and the job terminates.
>
> When StreamingFileSystem.close() is called, shouldn’t all buckets get auto-rolled, so that the .inprogress files become part-xxx files?
>
> Thanks,
>
> — Ken
>
>
> On Dec 9, 2019, at 6:56 PM, Jingsong Li <[hidden email]> wrote:
>
> Hi Kostas,
>
> I  took a look to StreamingFileSink.close, it just delete all temporary files. I know it is for failover. When Job fail, it should just delete temp files for next restart.
> But for testing purposes, we just want to run a bounded streaming job. If there is no checkpoint trigger, no one will move the final temp files to output path, so the result of this job is wrong.
> Do you have any idea about this? Can we distinguish "fail close" from "success finish close" in StreamingFileSink?
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi Li,
>>
>> This is the expected behavior. All the "exactly-once" sinks in Flink
>> require checkpointing to be enabled.
>> We will update the documentation to be clearer in the upcoming release.
>>
>> Thanks a lot,
>> Kostas
>>
>> On Sat, Dec 7, 2019 at 3:47 AM Li Peng <[hidden email]> wrote:
>> >
>> > Ok I seem to have solved the issue by enabling checkpointing. Based on the docs (I'm using 1.9.0), it seemed like only StreamingFileSink.forBulkFormat() should've required checkpointing, but based on this experience, StreamingFileSink.forRowFormat() requires it too! Is this the intended behavior? If so, the docs should probably be updated.
>> >
>> > Thanks,
>> > Li
>> >
>> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng <[hidden email]> wrote:
>> >>
>> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every minute, with flink-s3-fs-hadoop, and based on the default rolling policy, which is configured to "roll" every 60 seconds, I thought that would be automatic (I interpreted rolling to mean actually close a multipart upload to s3).
>> >>
>> >> But I'm not actually seeing files written to s3 at all, instead I see a bunch of open multipart uploads when I check the AWS s3 console, for example:
>> >>
>> >>  "Uploads": [
>> >>         {
>> >>             "Initiated": "2019-12-06T20:57:47.000Z",
>> >>             "Key": "2019-12-06--20/part-0-0"
>> >>         },
>> >>         {
>> >>             "Initiated": "2019-12-06T20:57:47.000Z",
>> >>             "Key": "2019-12-06--20/part-1-0"
>> >>         },
>> >>         {
>> >>             "Initiated": "2019-12-06T21:03:12.000Z",
>> >>             "Key": "2019-12-06--21/part-0-1"
>> >>         },
>> >>         {
>> >>             "Initiated": "2019-12-06T21:04:15.000Z",
>> >>             "Key": "2019-12-06--21/part-0-2"
>> >>         },
>> >>         {
>> >>             "Initiated": "2019-12-06T21:22:23.000Z"
>> >>             "Key": "2019-12-06--21/part-0-3"
>> >>         }
>> >> ]
>> >>
>> >> And these uploads are being open for a long time. So far after an hour, none of the uploads have been closed. Is this the expected behavior? If I wanted to get these uploads to actually write to s3 quickly, do I need to configure the hadoop stuff to get that done, like setting a smaller buffer/partition size to force it to upload?
>> >>
>> >> Thanks,
>> >> Li
>
>
>
> --
> Best, Jingsong Lee
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>