S3 file source - continuous monitoring - many files missed

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

S3 file source - continuous monitoring - many files missed

Averell
Good day everyone,

I have a Flink job that has an S3 folder as a source, and we keep putting
thousands of small (around 1KB each) gzip files into that folder, with the
rate of about 5000 files per minute. Here is how I created that source in
Scala:

   / val my_input_format = new TextInputFormat(new
org.apache.flink.core.fs.Path(my_path))
    my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
    my_input_format.setNestedFileEnumeration(true)

    val my_raw_stream = streamEnv
            .readFile(my_input_format,
                my_path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                1000)
/
The problem is, with the monitoring interval of 1,000ms as above, about 20%
of the files were missed. From Apache Flink Dashboard, at the subsequent
operators, I could only see ~80% of the total number of files recorded
("Records sent" column).

If I increase the monitoring interval, the number of missed files would
reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed.

No WARNING/ERROR recorded though.

I could not simulate this in HDFS, as I could not reach that high file
writing speed in our cluster.

Could someone please help. Thank you very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Jörn Franke
It could be related to S3 that seems to be configured for eventual consistency. Maybe it helps to configure strong consistency.

However, I recommend to replace S3 with a NoSQL database (since you are amazon Dynamo would help + Dynamodb streams, alternatively sns or sqs). The small size and high rate is not suitable for S3 or HDFS.

> On 24. Jul 2018, at 07:59, Averell <[hidden email]> wrote:
>
> Good day everyone,
>
> I have a Flink job that has an S3 folder as a source, and we keep putting
> thousands of small (around 1KB each) gzip files into that folder, with the
> rate of about 5000 files per minute. Here is how I created that source in
> Scala:
>
>   / val my_input_format = new TextInputFormat(new
> org.apache.flink.core.fs.Path(my_path))
>    my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
>    my_input_format.setNestedFileEnumeration(true)
>
>    val my_raw_stream = streamEnv
>            .readFile(my_input_format,
>                my_path,
>                FileProcessingMode.PROCESS_CONTINUOUSLY,
>                1000)
> /
> The problem is, with the monitoring interval of 1,000ms as above, about 20%
> of the files were missed. From Apache Flink Dashboard, at the subsequent
> operators, I could only see ~80% of the total number of files recorded
> ("Records sent" column).
>
> If I increase the monitoring interval, the number of missed files would
> reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed.
>
> No WARNING/ERROR recorded though.
>
> I could not simulate this in HDFS, as I could not reach that high file
> writing speed in our cluster.
>
> Could someone please help. Thank you very much.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Hi Jörn,
Thanks. I had missed that EMRFS strong consistency configuration. Will try
that now.
We also had a backup solution - using Kinesis instead of S3 (I don't see
Kinesis in your suggestion, but hope that it would be alright).

"/The small size and high rate is not suitable for S3 or HDFS/" <<<
regarding this, is there any guidelines on how big the file size should be
before we should consider S3/HDFS?

Thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Just some update: I tried to enable "EMRFS Consistent View" option, but it
didn't help. Not sure whether that's what you recommended, or something
else.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Jörn Franke
Sure kinesis is another way.

Can you try read after write consistency (assuming the files are not modified)

In any case it looks you would be better suited with a NoSQL store or kinesis (I don’t know your exact use case in order to provide you more details)

> On 24. Jul 2018, at 09:51, Averell <[hidden email]> wrote:
>
> Just some update: I tried to enable "EMRFS Consistent View" option, but it
> didn't help. Not sure whether that's what you recommended, or something
> else.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Could you please help explain more details on "/try read after write
consistency (assuming the files are not modified) /"?
I guess that the problem I got comes from the inconsistency in S3 files
listing. Otherwise, I would have got exceptions on file not found.

My use case is to read output files from another system. That system was
built some years back, and is outputting files to their S3 bucket. There is
no file modification, only new files are being created. We want to avoid
modifying that system.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Jörn Franke

You will find there a passage of the consistency model.

Probably the system is putting them to the folder and Flink is triggered before they are consistent.

What happens after Flink put s them on S3 ? Are they reused by another system ? Or is it just archival?

If they are reused then probably go for a nosql solution (eg Dynamo), if they are just archived then use kinesis + s3

On 24. Jul 2018, at 11:52, Averell <[hidden email]> wrote:

Could you please help explain more details on "/try read after write
consistency (assuming the files are not modified) /"?
I guess that the problem I got comes from the inconsistency in S3 files
listing. Otherwise, I would have got exceptions on file not found.

My use case is to read output files from another system. That system was
built some years back, and is outputting files to their S3 bucket. There is
no file modification, only new files are being created. We want to avoid
modifying that system.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Hello Jörn.

Thanks for your help.
"/Probably the system is putting them to the folder and Flink is triggered
before they are consistent./" <<< yes, I also guess so. However, if Flink is
triggered before they are consistent, either (a) there should be some error
messages, or (b) Flink should be able to identify those files in the
subsequent triggers. But in my case, those files are missed forever.

Right now those files for S3 are to be consumed by Flink only. The flow is
as follow:
           Existing system >>> S3 >>> Flink >>> Elastic Search.
If I cannot find a solution to the mentioned problem, I might need to change
to:
           Existing system >>> Kinesis >>> Flink >>> Elastic Search
Or
           Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic Search  
Or
           Existing system >>> S3 >>> Custom File Source + Flink >>> Elastic
Search
However, all those solutions would take much more effort.

Thanks!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Fabian Hueske-2
Hi,

The problem is that Flink tracks which files it has read by remembering the modification time of the file that was added (or modified) last.
We use the modification time, to avoid that we have to remember the names of all files that were ever consumed, which would be expensive to check and store over time.

One could change this logic to a hybrid approach that keeps the names of all files that have a mod timestamp that is larger than the max mod time minus an offset.

It would be great if you could open a Jira issue for this problem.

Thanks, Fabian


2018-07-24 14:58 GMT+02:00 Averell <[hidden email]>:
Hello Jörn.

Thanks for your help.
"/Probably the system is putting them to the folder and Flink is triggered
before they are consistent./" <<< yes, I also guess so. However, if Flink is
triggered before they are consistent, either (a) there should be some error
messages, or (b) Flink should be able to identify those files in the
subsequent triggers. But in my case, those files are missed forever.

Right now those files for S3 are to be consumed by Flink only. The flow is
as follow:
           Existing system >>> S3 >>> Flink >>> Elastic Search.
If I cannot find a solution to the mentioned problem, I might need to change
to:
           Existing system >>> Kinesis >>> Flink >>> Elastic Search
Or
           Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic Search 
Or
           Existing system >>> S3 >>> Custom File Source + Flink >>> Elastic
Search
However, all those solutions would take much more effort.

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Thank you Fabian.

I tried to implement a quick test basing on what you suggested: having an
offset from system time, and I did get improvement: with offset = 500ms -
the problem has completely gone.  With offset = 50ms, I still got around 3-5
files missed out of 10,000. This number might come from the difference
between clocks of the EC2 instance and S3.

I Will now try to implement exactly what you suggested, and open a Jira
issue as well.

Thanks for your help.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Hello Fabian,

I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940
BTW, I have one more question: Is it worth to checkpoint that list of
processed files? Does the current implementation of file-source guarantee
exactly-once?

Thanks for your support.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Fabian Hueske-2
Hi,

Thanks for creating the Jira issue.
I'm not sure if I would consider this a blocker but it is certainly an important problem to fix.

Anyway, in the original version Flink checkpoints the modification timestamp up to which all files have been read (or at least up to which point it *thinks* to have everything processed in case of S3).
In case of a recovery, the timestamp is reset to the checkpointed value and all files with a larger mod timestamp are processed again. This reset of the read position together with resetting the state of all operators results in exactly-once state consistency.

In order to avoid that the data of files is added twice to the state of an operator, an the monitoring sink must ensure that it does not read data again that was processed before the checkpoint was committed.
So, if you add an offset to the mod timestamp and track processed files with a ts larger than the checkpointed mod timestamp by file name, these names must be included in the checkpoint as well.

Best, Fabian


2018-07-25 6:34 GMT+02:00 Averell <[hidden email]>:
Hello Fabian,

I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940
BTW, I have one more question: Is it worth to checkpoint that list of
processed files? Does the current implementation of file-source guarantee
exactly-once?

Thanks for your support.

Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Thank you Fabian for the guide to implement the fix.

I'm not quite clear about the best practice of creating JIRA ticket. I
modified its priority to Major as you said that it is important.
What would happen next with that issue then? Someone (anyone) will pick it
and create a fix, then include that in the following release?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Fabian Hueske-2
Hi,

First of all, the ticket reports a bug (or improvement or feature suggestion) such that others are aware of the problem and understand its cause.

At some point it might be picked up and implemented. In general, there is no guarantee whether or when this happens, but the Flink community is of course eager to fix bugs and it's a rather important problem.
So it might be addressed soon. However, I cannot promise for which release it will be fixed.

You can of course help the community (and yourself) and contribute a fix for this problem.
Scratching your own itch is a good way to get started in contributing to open source projects ;-).

Best, Fabian


2018-07-25 10:23 GMT+02:00 Averell <[hidden email]>:
Thank you Fabian for the guide to implement the fix.

I'm not quite clear about the best practice of creating JIRA ticket. I
modified its priority to Major as you said that it is important.
What would happen next with that issue then? Someone (anyone) will pick it
and create a fix, then include that in the following release?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: S3 file source - continuous monitoring - many files missed

Averell
Here is my
https://github.com/lvhuyen/flink implementation
<https://github.com/lvhuyen/flink>   of the change. 3 files were updated:
StreamExecutionEnvironment.java, StreamExecutionEnvironment.scala, and
ContinuousFileMonitoringFunction.java.

All the thanks to Fabian.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/