Good day everyone,
I have a Flink job that has an S3 folder as a source, and we keep putting thousands of small (around 1KB each) gzip files into that folder, with the rate of about 5000 files per minute. Here is how I created that source in Scala: / val my_input_format = new TextInputFormat(new org.apache.flink.core.fs.Path(my_path)) my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter()) my_input_format.setNestedFileEnumeration(true) val my_raw_stream = streamEnv .readFile(my_input_format, my_path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000) / The problem is, with the monitoring interval of 1,000ms as above, about 20% of the files were missed. From Apache Flink Dashboard, at the subsequent operators, I could only see ~80% of the total number of files recorded ("Records sent" column). If I increase the monitoring interval, the number of missed files would reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed. No WARNING/ERROR recorded though. I could not simulate this in HDFS, as I could not reach that high file writing speed in our cluster. Could someone please help. Thank you very much. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
It could be related to S3 that seems to be configured for eventual consistency. Maybe it helps to configure strong consistency.
However, I recommend to replace S3 with a NoSQL database (since you are amazon Dynamo would help + Dynamodb streams, alternatively sns or sqs). The small size and high rate is not suitable for S3 or HDFS. > On 24. Jul 2018, at 07:59, Averell <[hidden email]> wrote: > > Good day everyone, > > I have a Flink job that has an S3 folder as a source, and we keep putting > thousands of small (around 1KB each) gzip files into that folder, with the > rate of about 5000 files per minute. Here is how I created that source in > Scala: > > / val my_input_format = new TextInputFormat(new > org.apache.flink.core.fs.Path(my_path)) > my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter()) > my_input_format.setNestedFileEnumeration(true) > > val my_raw_stream = streamEnv > .readFile(my_input_format, > my_path, > FileProcessingMode.PROCESS_CONTINUOUSLY, > 1000) > / > The problem is, with the monitoring interval of 1,000ms as above, about 20% > of the files were missed. From Apache Flink Dashboard, at the subsequent > operators, I could only see ~80% of the total number of files recorded > ("Records sent" column). > > If I increase the monitoring interval, the number of missed files would > reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed. > > No WARNING/ERROR recorded though. > > I could not simulate this in HDFS, as I could not reach that high file > writing speed in our cluster. > > Could someone please help. Thank you very much. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Jörn,
Thanks. I had missed that EMRFS strong consistency configuration. Will try that now. We also had a backup solution - using Kinesis instead of S3 (I don't see Kinesis in your suggestion, but hope that it would be alright). "/The small size and high rate is not suitable for S3 or HDFS/" <<< regarding this, is there any guidelines on how big the file size should be before we should consider S3/HDFS? Thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Just some update: I tried to enable "EMRFS Consistent View" option, but it
didn't help. Not sure whether that's what you recommended, or something else. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Sure kinesis is another way.
Can you try read after write consistency (assuming the files are not modified) In any case it looks you would be better suited with a NoSQL store or kinesis (I don’t know your exact use case in order to provide you more details) > On 24. Jul 2018, at 09:51, Averell <[hidden email]> wrote: > > Just some update: I tried to enable "EMRFS Consistent View" option, but it > didn't help. Not sure whether that's what you recommended, or something > else. > > Thanks! > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Could you please help explain more details on "/try read after write
consistency (assuming the files are not modified) /"? I guess that the problem I got comes from the inconsistency in S3 files listing. Otherwise, I would have got exceptions on file not found. My use case is to read output files from another system. That system was built some years back, and is outputting files to their S3 bucket. There is no file modification, only new files are being created. We want to avoid modifying that system. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You will find there a passage of the consistency model. Probably the system is putting them to the folder and Flink is triggered before they are consistent. What happens after Flink put s them on S3 ? Are they reused by another system ? Or is it just archival? If they are reused then probably go for a nosql solution (eg Dynamo), if they are just archived then use kinesis + s3
|
Hello Jörn.
Thanks for your help. "/Probably the system is putting them to the folder and Flink is triggered before they are consistent./" <<< yes, I also guess so. However, if Flink is triggered before they are consistent, either (a) there should be some error messages, or (b) Flink should be able to identify those files in the subsequent triggers. But in my case, those files are missed forever. Right now those files for S3 are to be consumed by Flink only. The flow is as follow: Existing system >>> S3 >>> Flink >>> Elastic Search. If I cannot find a solution to the mentioned problem, I might need to change to: Existing system >>> Kinesis >>> Flink >>> Elastic Search Or Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic Search Or Existing system >>> S3 >>> Custom File Source + Flink >>> Elastic Search However, all those solutions would take much more effort. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, The problem is that Flink tracks which files it has read by remembering the modification time of the file that was added (or modified) last. We use the modification time, to avoid that we have to remember the names of all files that were ever consumed, which would be expensive to check and store over time. One could change this logic to a hybrid approach that keeps the names of all files that have a mod timestamp that is larger than the max mod time minus an offset. It would be great if you could open a Jira issue for this problem. Thanks, Fabian 2018-07-24 14:58 GMT+02:00 Averell <[hidden email]>: Hello Jörn. |
Thank you Fabian.
I tried to implement a quick test basing on what you suggested: having an offset from system time, and I did get improvement: with offset = 500ms - the problem has completely gone. With offset = 50ms, I still got around 3-5 files missed out of 10,000. This number might come from the difference between clocks of the EC2 instance and S3. I Will now try to implement exactly what you suggested, and open a Jira issue as well. Thanks for your help. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hello Fabian,
I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940 BTW, I have one more question: Is it worth to checkpoint that list of processed files? Does the current implementation of file-source guarantee exactly-once? Thanks for your support. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Thanks for creating the Jira issue. I'm not sure if I would consider this a blocker but it is certainly an important problem to fix. Anyway, in the original version Flink checkpoints the modification timestamp up to which all files have been read (or at least up to which point it *thinks* to have everything processed in case of S3). In case of a recovery, the timestamp is reset to the checkpointed value and all files with a larger mod timestamp are processed again. This reset of the read position together with resetting the state of all operators results in exactly-once state consistency. In order to avoid that the data of files is added twice to the state of an operator, an the monitoring sink must ensure that it does not read data again that was processed before the checkpoint was committed. So, if you add an offset to the mod timestamp and track processed files with a ts larger than the checkpointed mod timestamp by file name, these names must be included in the checkpoint as well. Best, Fabian 2018-07-25 6:34 GMT+02:00 Averell <[hidden email]>: Hello Fabian, |
Thank you Fabian for the guide to implement the fix.
I'm not quite clear about the best practice of creating JIRA ticket. I modified its priority to Major as you said that it is important. What would happen next with that issue then? Someone (anyone) will pick it and create a fix, then include that in the following release? Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, First of all, the ticket reports a bug (or improvement or feature suggestion) such that others are aware of the problem and understand its cause. At some point it might be picked up and implemented. In general, there is no guarantee whether or when this happens, but the Flink community is of course eager to fix bugs and it's a rather important problem. So it might be addressed soon. However, I cannot promise for which release it will be fixed. You can of course help the community (and yourself) and contribute a fix for this problem. Scratching your own itch is a good way to get started in contributing to open source projects ;-). Best, Fabian 2018-07-25 10:23 GMT+02:00 Averell <[hidden email]>: Thank you Fabian for the guide to implement the fix. |
Here is my
https://github.com/lvhuyen/flink implementation <https://github.com/lvhuyen/flink> of the change. 3 files were updated: StreamExecutionEnvironment.java, StreamExecutionEnvironment.scala, and ContinuousFileMonitoringFunction.java. All the thanks to Fabian. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |