Hi,
i have problem with Flink in version 1.3.1. I have standalone cluster with two JobManagers and four TaskManager, as DFS i use windows high available storage mounted by cifs protocol. And sometimes i'm starting having problem that Flink doesn't remove checkpoint dirs for job and completedCheckpoint files from "high-availability.storageDir". To bring back cluster to normal working i need to remove all dirs from DFS and start everything from beginning. Maybe someone of Flink users had the same problem. For now i doesn't have any idea how to bring back cluster to normal work without deleting dirs from DFS. I don't want to delete dirs from DFS because than i need to redeploy all jobs. Best regards Szymon Szczypiński |
Can you clarify which one does not get deleted? The file in the "high-availability.storageDir", or the
"state.backend.fs.
Could you also tell us which file system you use? There is a known issue in some versions of Flink that S3 "directories" are not deleted. This means that Hadoop's S3 marker files (the way that Hadoop's s3n and s3a imitate directories in S3) are not deleted. This is fixed in Flink 1.5. A workaround for Flink 1.4 is to use the "flink-s3-fs-presto", which does not uses these marker files. On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński <[hidden email]> wrote:
|
Hi, Best regards W dniu 2018-04-02 o 15:58, Stephan Ewen
pisze:
|
Hi!
Sorry for the late response... In which Flink version are you? I am wondering if this is somewhat related to that specific setup:
Windows DFS filesystem mounted on Linux with CIFS - For the "completedCheckpoint<SomeID>", the cleanup should happen in the "ZooKeeperCompletedCheckpointStore" when dropping a checkpoint - For the "state.backend.fs. Best, Stephan On Sat, Apr 7, 2018 at 1:11 AM, Szymon Szczypiński <[hidden email]> wrote:
|
HI,
the problem was started on 1.3.1. Now I upgraded to Flink 1.3.3. I changed my cluster to 1.3.3 because of jira https://issues.apache.org/jira/browse/FLINK-8807. I will check in debug mode why cluster doesn't remove those files, maybe i will see why. Best regards On 22.04.2018 16:59, Stephan Ewen wrote:
|
Hi,
now i know why those files wasn't "remove". They remove but very slow. In my case(Flink 1.3) the problem is in line client.delete().inBackground(backgroundCallback, executor).forPath(path);where deletion is in background in executor pool where size is equal to 2. When i have more files/dirs in "high-availability.storageDir" and "state.backend.fs.checkpointdir" then delete operation are longer and longer and queued operation in pool are increase. In my case the main problem is that i have 12 job deployed on cluster and checkpoint is set for 5 seconds. I know that i need to increase timeout between checkpoints, i will increase to 1 or 5 minutes depends from job businesses logic. But i still have some question. Where is set size of executor pool size because i was analyzing the flink code and still don't know where the size is set. Maybe someone can of users know where pool is created. Best regards On 22.04.2018 17:22, Szymon Szczypiński wrote:
|
Free forum by Nabble | Edit this page |