Good day everyone,
I have about 100 thousand files to read, and a custom FilePathFilter with a simple filterPath method defined as below (the custom part is only to check file-size and skip files with size = 0) override def filterPath(filePath: Path): Boolean = { filePath == null || filePath.getName.startsWith(".") || filePath.getName.startsWith("_") || filePath.getName.contains(FilePathFilter.HADOOP_COPYING) || { def fileStatus = filePath.getFileSystem.getFileStatus(filePath) !fileStatus.isDir && fileStatus.getLen == 0 } } It is running fine either when I disable checkpointing or when I use the default FilePathFilter. It takes about 7 minutes to finished processing all files (from source to sink). However, when I have both, customer filter and checkpointing, it usually takes 15-20 minutes for Flink to start reading my files (in Flink GUI, the CustomFileSource monitor generates 0 records during that 15-20 minutes period) Could someone please help with this? Thank you very much for your time. Best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Is this all the custom code for "CustomFileSource"? If not, can you share the entire file with us, and if you can set the log level to DEBUG, it will help you analyze and locate the problem. If you can't come to a conclusion, you can share the log with us. Thanks, vino. Averell <[hidden email]> 于2018年9月21日周五 上午6:51写道: Good day everyone, |
This post was updated on .
Hi Vino,
I am using a custom FileInputFormat, but the mentioned problem only comes when I try a custom FilePathFilter. My whole file for that custom FilePathFilter is quoted below. Regarding enabling DEBUG, which classes/packages should I turn DEBUG on? as I am afraid that turning DEBUG on at the global level would be too heavy. Thanks and regards, Averell ================== import org.apache.flink.api.common.io.FilePathFilter import org.apache.flink.core.fs.Path import org.slf4j.LoggerFactory object SdcFilePathFilter { private val LOG = LoggerFactory.getLogger(classOf[SdcFilePathFilter]) } class SdcFilePathFilter(lookBackPeriod: Long, homePath: Path) extends FilePathFilter { private val homeDepth = homePath.depth() override def filterPath(filePath: Path): Boolean = { filePath == null || filePath.getName.startsWith(".") || filePath.getName.startsWith("_") || filePath.getName.contains(FilePathFilter.HADOOP_COPYING) || { def fileStatus = filePath.getFileSystem.getFileStatus(filePath) !fileStatus.isDir && fileStatus.getLen == 0 } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Vino, and all,
I tried to avoid the step to get File Status, and found that the problem is not there any more. I guess doing that with every single file out of 100K+ files on S3 caused some issue with checkpointing. Still trying to find the cause, but with lower priority now. Thanks for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
Happy to hear that the problem is no longer there and if you have more news from your debugging, let us know. The thing that I wanted to mention is that from what you are describing, the problem does not seem to be related to checkpointing, but to the fact that applying your filter on the 100’s of thousands of small files takes time. This may help with your debugging. Cheers, Kostas > On Sep 24, 2018, at 2:10 AM, Averell <[hidden email]> wrote: > > Hi Vino, and all, > > I tried to avoid the step to get File Status, and found that the problem is > not there any more. I guess doing that with every single file out of 100K+ > files on S3 caused some issue with checkpointing. > Still trying to find the cause, but with lower priority now. > > Thanks for your help. > > Regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
Yes, applying the filter on the 100K files takes time, and the delay of 15 minutes I observed definitely caused by that big number of files and the cost of each individual file status check. However, the delay is much smaller when checkpointing is off. Within that 15 minutes, checkpointing process is not triggered though. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
Can you describe your settings in a bit more detail? For example, are you reading in PROCESS_CONTINUOUSLY mode or PROCESS_ONCE? What is your checkpoint interval? The above are to understand why checkpoints are not processed within these 15 min. Kostas > On Sep 25, 2018, at 8:08 AM, Averell <[hidden email]> wrote: > > Hi Kostas, > > Yes, applying the filter on the 100K files takes time, and the delay of 15 > minutes I observed definitely caused by that big number of files and the > cost of each individual file status check. However, the delay is much > smaller when checkpointing is off. > Within that 15 minutes, checkpointing process is not triggered though. > > Thanks and regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
I use PROCESS_CONTINUOUSLY mode, and checkpoint interval of 20 minutes. When I said "Within that 15 minutes, checkpointing process is not triggered though" in my previous email, I was not complaining that checkpoint is not running, but to say that the slowness is not due to ongoing checkpoint. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I see,
Thanks for the clarification. Cheers, Kostas > On Sep 25, 2018, at 8:51 AM, Averell <[hidden email]> wrote: > > Hi Kostas, > > I use PROCESS_CONTINUOUSLY mode, and checkpoint interval of 20 minutes. When > I said "Within that 15 minutes, checkpointing process is not triggered > though" in my previous email, I was not complaining that checkpoint is not > running, but to say that the slowness is not due to ongoing checkpoint. > > Thanks and regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This post was updated on .
Thank you Kostas for spending time on my case.
Relating to the issue I mentioned, I have another issue caused by having a lot of files to list. From the error msg, I understand that the listing was taking more than 30s, and the JM thought that it hung and killed it. Is that possible to increase this 30s timer? Thanks and regards, Averell ===== UPDATE ===== For the question w.r.t. the timer, I found the answer: streamEnv.getConfig.setTaskCancellationInterval(cfgTaskTimeoutInterval) ================ 2018-09-25 12:01:13.222 [Canceler/Interrupts for Source: Custom File Source (1/1) (a5f5434070044510eafc9103bc24af43).] WARN org.apache.flink.runtime.taskmanager.Task - Task 'Source: Custom File Source (1/1)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.net.URI$Parser.scan(URI.java:2998) java.net.URI$Parser.parseAuthority(URI.java:3138) java.net.URI$Parser.parseHierarchical(URI.java:3097) java.net.URI$Parser.parse(URI.java:3053) java.net.URI.<init>(URI.java:746) org.apache.hadoop.fs.Path.makeQualified(Path.java:467) org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:464) com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem$$Lambda$63/515305348.apply(Unknown Source) com.amazon.ws.emr.hadoop.fs.s3n.BasicFileStatusFactory.newFile(BasicFileStatusFactory.java:69) com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.newFile(S3NativeFileSystem.java:1154) com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:962) com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:914) com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:364) org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157) org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:97) org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:395) org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416) org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416) org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:327) org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:292) org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |