blob store defaults to /tmp and files get deleted

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

blob store defaults to /tmp and files get deleted

Shannon Carey
A few of my jobs recently failed and showed this exception:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
    file: '/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2' (invalid JAR: zip file is empty)
Class not resolvable through given classloader.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
	at java.lang.Thread.run(Thread.java:745)

As you can see, Flink is storing things underneath /tmp, which is the (undocumented) default for the blob store. As you may know, on Linux, there's typically a program such as tmpwatch which is run periodically to clear out data from /tmp.

Flink also uses /tmp as the default for jobmanager.web.tmpdir (and jobmanager.web.upload.dir in 1.2).

Therefore, assuming that this is indeed the cause of the job failure/the exception, it seems highly advisable that when you run a Flink cluster you configure blob.storage.directory and jobmanager.web.tmpdir to a specific folder that is not beneath /tmp. I don't know if there is any material about setting up a production cluster, but this would definitely seem to be a necessary configuration to specify if you want to avoid problems. Enabling High Availability mode should also be on that list, I think.

-Shannon
Reply | Threaded
Open this post in threaded view
|

Re: blob store defaults to /tmp and files get deleted

Ufuk Celebi
Hey Shannon,

good idea! We currently have this:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html

It has a strong focus on managed state and not the points you mentioned.

Would you like to create an issue for adding this to the production
check list? I think it's valuable feedback.

– Ufuk


On Fri, Feb 17, 2017 at 7:24 PM, Shannon Carey <[hidden email]> wrote:

> A few of my jobs recently failed and showed this exception:
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
>     file:
> '/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
> (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
>
>
> As you can see, Flink is storing things underneath /tmp, which is the
> (undocumented) default for the blob store. As you may know, on Linux,
> there's typically a program such as tmpwatch which is run periodically to
> clear out data from /tmp.
>
>
> Flink also uses /tmp as the default for jobmanager.web.tmpdir (and
> jobmanager.web.upload.dir in 1.2).
>
>
> Therefore, assuming that this is indeed the cause of the job failure/the
> exception, it seems highly advisable that when you run a Flink cluster you
> configure blob.storage.directory and jobmanager.web.tmpdir to a specific
> folder that is not beneath /tmp. I don't know if there is any material about
> setting up a production cluster, but this would definitely seem to be a
> necessary configuration to specify if you want to avoid problems. Enabling
> High Availability mode should also be on that list, I think.
>
>
> -Shannon
Reply | Threaded
Open this post in threaded view
|

Re: blob store defaults to /tmp and files get deleted

Stephan Ewen
Hi Shannon!

In the latest HA and BlobStore changes (1.3) it uses "/tmp" only for caching and will re-obtain the files from the persistent storage.

I think we should make this a bigger point, even:
  - Flink should not use "/tmp" at all (except for mini cluster mode)
  - Yarn and Mesos should always use the "local directory" for temporary files. They are cleaned up anyways.
  - For the Standalone Setup, one should configure a suitable temp storage dir (and everything should be relative to that).

Stephan



On Mon, Feb 20, 2017 at 3:22 PM, Ufuk Celebi <[hidden email]> wrote:
Hey Shannon,

good idea! We currently have this:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html

It has a strong focus on managed state and not the points you mentioned.

Would you like to create an issue for adding this to the production
check list? I think it's valuable feedback.

– Ufuk


On Fri, Feb 17, 2017 at 7:24 PM, Shannon Carey <[hidden email]> wrote:
> A few of my jobs recently failed and showed this exception:
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
>     file:
> '/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
> (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
>       at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>       at java.lang.Thread.run(Thread.java:745)
>
>
> As you can see, Flink is storing things underneath /tmp, which is the
> (undocumented) default for the blob store. As you may know, on Linux,
> there's typically a program such as tmpwatch which is run periodically to
> clear out data from /tmp.
>
>
> Flink also uses /tmp as the default for jobmanager.web.tmpdir (and
> jobmanager.web.upload.dir in 1.2).
>
>
> Therefore, assuming that this is indeed the cause of the job failure/the
> exception, it seems highly advisable that when you run a Flink cluster you
> configure blob.storage.directory and jobmanager.web.tmpdir to a specific
> folder that is not beneath /tmp. I don't know if there is any material about
> setting up a production cluster, but this would definitely seem to be a
> necessary configuration to specify if you want to avoid problems. Enabling
> High Availability mode should also be on that list, I think.
>
>
> -Shannon

Reply | Threaded
Open this post in threaded view
|

Re: blob store defaults to /tmp and files get deleted

Stephan Ewen
Hi Shannon!

Looking into this a bit more, I think it is something all together different:

  - Flink (via the user code class loader) actually holds a reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR file remains usable by the class loader (at least under Linux/Unix/Mac). The "/tmp" wipe removes only the parent directory references to the file. The open reference from the class loader prevents the file contents from being deleted.

  - The empty file you have looks more like a failed fetch attempt from the blob manager. That should be fixed by making sure that blobs are fetched to a temp file which is renamed upon completion. That way, we prevent that the blob hash has an existing but invalid file.

Best,
Stephan


On Mon, Feb 20, 2017 at 8:06 PM, Stephan Ewen <[hidden email]> wrote:
Hi Shannon!

In the latest HA and BlobStore changes (1.3) it uses "/tmp" only for caching and will re-obtain the files from the persistent storage.

I think we should make this a bigger point, even:
  - Flink should not use "/tmp" at all (except for mini cluster mode)
  - Yarn and Mesos should always use the "local directory" for temporary files. They are cleaned up anyways.
  - For the Standalone Setup, one should configure a suitable temp storage dir (and everything should be relative to that).

Stephan



On Mon, Feb 20, 2017 at 3:22 PM, Ufuk Celebi <[hidden email]> wrote:
Hey Shannon,

good idea! We currently have this:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html

It has a strong focus on managed state and not the points you mentioned.

Would you like to create an issue for adding this to the production
check list? I think it's valuable feedback.

– Ufuk


On Fri, Feb 17, 2017 at 7:24 PM, Shannon Carey <[hidden email]> wrote:
> A few of my jobs recently failed and showed this exception:
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
>     file:
> '/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
> (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
>       at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>       at java.lang.Thread.run(Thread.java:745)
>
>
> As you can see, Flink is storing things underneath /tmp, which is the
> (undocumented) default for the blob store. As you may know, on Linux,
> there's typically a program such as tmpwatch which is run periodically to
> clear out data from /tmp.
>
>
> Flink also uses /tmp as the default for jobmanager.web.tmpdir (and
> jobmanager.web.upload.dir in 1.2).
>
>
> Therefore, assuming that this is indeed the cause of the job failure/the
> exception, it seems highly advisable that when you run a Flink cluster you
> configure blob.storage.directory and jobmanager.web.tmpdir to a specific
> folder that is not beneath /tmp. I don't know if there is any material about
> setting up a production cluster, but this would definitely seem to be a
> necessary configuration to specify if you want to avoid problems. Enabling
> High Availability mode should also be on that list, I think.
>
>
> -Shannon


Reply | Threaded
Open this post in threaded view
|

Re: blob store defaults to /tmp and files get deleted

Shannon Carey
In reply to this post by Stephan Ewen
Stephan,


You mention "Flink (via the user code class loader) actually holds a reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR file remains usable by the class loader". In my understanding, even if that's true, it doesn't work over a failure of the JobManager/TaskManager process, because the handle would be lost and the file would be gone.

We're still running Flink 1.2.1, so maybe we're missing out on some of the improvements that have been made. However, we recently had a problem with a batch (DataSet) job not restarting successfully, apparently after a JobManager failure. This particular job runs in AWS EMR (on Yarn) which means that only one JobManager is run at a time, and when it fails it gets restarted.

Here's what I can see from the logs. When the job restarts, it goes from CREATED -> RUNNING state, and then logs:

23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob (c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to SUSPENDED.
java.lang.Exception: JobManager is no longer the leader.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)

I assume that's normal/expected, because the JobManager was restarted but some portion of the job state is still referring to the old one. Next, YarnJobManager logs: "Attempting to recover job c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:

2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager                          (flink-akka.actor.default-dispatcher-96): Failed to recover job c58185a78dd64cfc9f12374bd1f9a679.
java.lang.Exception: Failed to retrieve the submitted job graph from state handle.
at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop input format
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:319)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
... 15 more
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.AvroParquetInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:317)
... 69 more

The missing class comes from our job, so it seems like the job jar isn't present on the classpath of the JobManager. When I look at the contents of our configured blob storage directory (we're not using /tmp), I see subfolders like:

blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed

Only one of the two has a JAR in it, so it looks like there's a new directory created for each new JobManager. When I look in Zookeeper at nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I don't see those directories mentioned. I am wondering if someone can explain how Flink knows how to retrieve the job jar for job retry when the JobManager has failed? Are we running into a Flink bug here?

Thanks for the info,
Shannon

Reply | Threaded
Open this post in threaded view
|

Re: blob store defaults to /tmp and files get deleted

Eron Wright
The directory referred to by `blob.storage.directory` is best described as a local cache.  For recovery purposes the JARs are also stored in `high-availability.storageDir`.    At least that's my reading of the code in 1.2.   Maybe there's some YARN specific behavior too, sorry if this information is incomplete.



On Fri, Aug 4, 2017 at 11:56 AM, Shannon Carey <[hidden email]> wrote:
Stephan,


You mention "Flink (via the user code class loader) actually holds a reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR file remains usable by the class loader". In my understanding, even if that's true, it doesn't work over a failure of the JobManager/TaskManager process, because the handle would be lost and the file would be gone.

We're still running Flink 1.2.1, so maybe we're missing out on some of the improvements that have been made. However, we recently had a problem with a batch (DataSet) job not restarting successfully, apparently after a JobManager failure. This particular job runs in AWS EMR (on Yarn) which means that only one JobManager is run at a time, and when it fails it gets restarted.

Here's what I can see from the logs. When the job restarts, it goes from CREATED -> RUNNING state, and then logs:

23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob (c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to SUSPENDED.
java.lang.Exception: JobManager is no longer the leader.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)

I assume that's normal/expected, because the JobManager was restarted but some portion of the job state is still referring to the old one. Next, YarnJobManager logs: "Attempting to recover job c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:

2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager                          (flink-akka.actor.default-dispatcher-96): Failed to recover job c58185a78dd64cfc9f12374bd1f9a679.
java.lang.Exception: Failed to retrieve the submitted job graph from state handle.
at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop input format
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:319)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
... 15 more
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.AvroParquetInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.readObject(HadoopInputFormatBase.java:317)
... 69 more

The missing class comes from our job, so it seems like the job jar isn't present on the classpath of the JobManager. When I look at the contents of our configured blob storage directory (we're not using /tmp), I see subfolders like:

blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed

Only one of the two has a JAR in it, so it looks like there's a new directory created for each new JobManager. When I look in Zookeeper at nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I don't see those directories mentioned. I am wondering if someone can explain how Flink knows how to retrieve the job jar for job retry when the JobManager has failed? Are we running into a Flink bug here?

Thanks for the info,
Shannon