Strange behaviour with checkpointing and custom FilePathFilter

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Strange behaviour with checkpointing and custom FilePathFilter

Averell
Good day everyone,

I have about 100 thousand files to read, and a custom FilePathFilter with a
simple filterPath method defined as below (the custom part is only to check
file-size and skip files with size = 0)
        override def filterPath(filePath: Path): Boolean = {
                filePath == null ||
                                filePath.getName.startsWith(".") ||
                                filePath.getName.startsWith("_") ||
                                filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||
                                {
                                                def fileStatus = filePath.getFileSystem.getFileStatus(filePath)
                                                !fileStatus.isDir && fileStatus.getLen == 0
                                }
        }

It is running fine either when I disable checkpointing or when I use the
default FilePathFilter. It takes about 7 minutes to finished processing all
files (from source to sink).
However, when I have both, customer filter and checkpointing, it usually
takes 15-20 minutes for Flink to start reading my files (in Flink GUI, the
CustomFileSource monitor generates 0 records during that 15-20 minutes
period)

Could someone please help with this?

Thank you very much for your time.
Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

vino yang
Hi Averell,

Is this all the custom code for "CustomFileSource"? 
If not, can you share the entire file with us, and if you can set the log level to DEBUG, it will help you analyze and locate the problem. 
If you can't come to a conclusion, you can share the log with us.

Thanks, vino.


Averell <[hidden email]> 于2018年9月21日周五 上午6:51写道:
Good day everyone,

I have about 100 thousand files to read, and a custom FilePathFilter with a
simple filterPath method defined as below (the custom part is only to check
file-size and skip files with size = 0)
        override def filterPath(filePath: Path): Boolean = {
                filePath == null ||
                                filePath.getName.startsWith(".") ||
                                filePath.getName.startsWith("_") ||
                                filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||
                                {
                                                def fileStatus = filePath.getFileSystem.getFileStatus(filePath)
                                                !fileStatus.isDir && fileStatus.getLen == 0
                                }
        }

It is running fine either when I disable checkpointing or when I use the
default FilePathFilter. It takes about 7 minutes to finished processing all
files (from source to sink).
However, when I have both, customer filter and checkpointing, it usually
takes 15-20 minutes for Flink to start reading my files (in Flink GUI, the
CustomFileSource monitor generates 0 records during that 15-20 minutes
period)

Could someone please help with this?

Thank you very much for your time.
Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Averell
This post was updated on .
Hi Vino,

I am using a custom FileInputFormat, but the mentioned problem only comes
when I try a custom FilePathFilter.

My whole file for that custom FilePathFilter is quoted below.

Regarding enabling DEBUG, which classes/packages should I turn DEBUG on? as
I am afraid that turning DEBUG on at the global level would be too heavy.

Thanks and regards,
Averell


==================
import org.apache.flink.api.common.io.FilePathFilter
import org.apache.flink.core.fs.Path
import org.slf4j.LoggerFactory

object SdcFilePathFilter {
        private val LOG = LoggerFactory.getLogger(classOf[SdcFilePathFilter])
}

class SdcFilePathFilter(lookBackPeriod: Long, homePath: Path) extends
FilePathFilter {
        private val homeDepth = homePath.depth()

 override def filterPath(filePath: Path): Boolean = {
                filePath == null ||
                                filePath.getName.startsWith(".") ||
                                filePath.getName.startsWith("_") ||
                                filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||
                                {
                                                def fileStatus = filePath.getFileSystem.getFileStatus(filePath)
                                                !fileStatus.isDir && fileStatus.getLen == 0
                                }
        }
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Averell
Hi Vino, and all,

I tried to avoid the step to get File Status, and found that the problem is
not there any more. I guess doing that with every single file out of 100K+
files on S3 caused some issue with checkpointing.
Still trying to find the cause, but with lower priority now.

Thanks for your help.

Regards,
Averell  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Kostas Kloudas
Hi Averell,

Happy to hear that the problem is no longer there and if you have more news from your
debugging, let us know.

The thing that I wanted to mention is that from what you are describing, the problem does
not seem to be related to checkpointing, but to the fact that applying your filter on the 100’s
of thousands of small files takes time.

This may help with your debugging.

Cheers,
Kostas

> On Sep 24, 2018, at 2:10 AM, Averell <[hidden email]> wrote:
>
> Hi Vino, and all,
>
> I tried to avoid the step to get File Status, and found that the problem is
> not there any more. I guess doing that with every single file out of 100K+
> files on S3 caused some issue with checkpointing.
> Still trying to find the cause, but with lower priority now.
>
> Thanks for your help.
>
> Regards,
> Averell  
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Averell
Hi Kostas,

Yes, applying the filter on the 100K files takes time, and the delay of 15
minutes I observed definitely caused by that big number of files and the
cost of each individual file status check. However, the delay is much
smaller when checkpointing is off.
Within that 15 minutes, checkpointing process is not triggered though.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Kostas Kloudas
Hi Averell,

Can you describe your settings in a bit more detail?

For example, are you reading in PROCESS_CONTINUOUSLY mode or PROCESS_ONCE?
What is your checkpoint interval?

The above are to understand why checkpoints are not processed within these 15 min.

Kostas

> On Sep 25, 2018, at 8:08 AM, Averell <[hidden email]> wrote:
>
> Hi Kostas,
>
> Yes, applying the filter on the 100K files takes time, and the delay of 15
> minutes I observed definitely caused by that big number of files and the
> cost of each individual file status check. However, the delay is much
> smaller when checkpointing is off.
> Within that 15 minutes, checkpointing process is not triggered though.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Averell
Hi Kostas,

I use PROCESS_CONTINUOUSLY mode, and checkpoint interval of 20 minutes. When
I said "Within that 15 minutes, checkpointing process is not triggered
though" in my previous email, I was not complaining that checkpoint is not
running, but to say that the slowness is not due to ongoing checkpoint.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Kostas Kloudas
I see,

Thanks for the clarification.

Cheers,
Kostas

> On Sep 25, 2018, at 8:51 AM, Averell <[hidden email]> wrote:
>
> Hi Kostas,
>
> I use PROCESS_CONTINUOUSLY mode, and checkpoint interval of 20 minutes. When
> I said "Within that 15 minutes, checkpointing process is not triggered
> though" in my previous email, I was not complaining that checkpoint is not
> running, but to say that the slowness is not due to ongoing checkpoint.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour with checkpointing and custom FilePathFilter

Averell
This post was updated on .
Thank you Kostas for spending time on my case.

Relating to the issue I mentioned, I have another issue caused by having a
lot of files to list. From the error msg, I understand that the listing was
taking more than 30s, and the JM thought that it hung and killed it. Is that
possible to increase this 30s timer?

Thanks and regards,
Averell

===== UPDATE =====
For the question w.r.t. the timer, I found the answer:
                streamEnv.getConfig.setTaskCancellationInterval(cfgTaskTimeoutInterval)
 
================


2018-09-25 12:01:13.222 [Canceler/Interrupts for Source: Custom File Source
(1/1) (a5f5434070044510eafc9103bc24af43).] WARN
org.apache.flink.runtime.taskmanager.Task  - Task 'Source: Custom File
Source (1/1)' did not react to cancelling signal for 30 seconds, but is
stuck in method:
 java.net.URI$Parser.scan(URI.java:2998)
java.net.URI$Parser.parseAuthority(URI.java:3138)
java.net.URI$Parser.parseHierarchical(URI.java:3097)
java.net.URI$Parser.parse(URI.java:3053)
java.net.URI.<init>(URI.java:746)
org.apache.hadoop.fs.Path.makeQualified(Path.java:467)
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:464)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem$$Lambda$63/515305348.apply(Unknown
Source)
com.amazon.ws.emr.hadoop.fs.s3n.BasicFileStatusFactory.newFile(BasicFileStatusFactory.java:69)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.newFile(S3NativeFileSystem.java:1154)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:962)
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:914)
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:364)
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:97)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:395)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.listEligibleFiles(ContinuousFileMonitoringFunction.java:416)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:327)
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:292)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/