`env.java.opts` not persisting after job canceled or failed and then restarted

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

`env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hello!

*tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.

We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:

```
env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
```

If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures. 

The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason. 

Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!

About our setup:

- Flink Version: 1.7.0
- Deployment: Standalone in HA
- Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.

Best,

Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Ufuk Celebi
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:

>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff? 

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
    val library = args(0)
    System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
    System.out.println(s"Attempting to load $library")
    System.out.flush()
    System.loadLibrary(library)
    System.out.println(s"Successfully loaded ")
    System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas? 

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hi Ufuk,

Update: I've pinned down the issue. It's multiple classloaders loading `libhadoop.so`:

```
failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so already loaded in another classloader
```

I'm not quite sure what the solution is. Ideally flink would destroy a classloader when a job is canceled, but perhaps there's a jvm limitation there? Putting the libraries into `/usr/lib` or `/lib` does not work (as suggested by Chesnay in the ticket) as I get the same error. I might see if I can put a jar with `org.apache.hadoop.common.io.compress` in `/flink/install/lib` and then remove it from my jar. It's not an ideal solution but I can't think of anything else.

Best,

Aaron Levin

On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff? 

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
    val library = args(0)
    System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
    System.out.println(s"Attempting to load $library")
    System.out.flush()
    System.loadLibrary(library)
    System.out.println(s"Successfully loaded ")
    System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas? 

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Guowei Ma
This may be caused by a  jvm process can only load a so once.So a triky way is to rename it。

发自我的 iPhone

在 2019年1月25日,上午7:12,Aaron Levin <[hidden email]> 写道:

Hi Ufuk,

Update: I've pinned down the issue. It's multiple classloaders loading `libhadoop.so`:

```
failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so already loaded in another classloader
```

I'm not quite sure what the solution is. Ideally flink would destroy a classloader when a job is canceled, but perhaps there's a jvm limitation there? Putting the libraries into `/usr/lib` or `/lib` does not work (as suggested by Chesnay in the ticket) as I get the same error. I might see if I can put a jar with `org.apache.hadoop.common.io.compress` in `/flink/install/lib` and then remove it from my jar. It's not an ideal solution but I can't think of anything else.

Best,

Aaron Levin

On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff? 

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
    val library = args(0)
    System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
    System.out.println(s"Attempting to load $library")
    System.out.flush()
    System.loadLibrary(library)
    System.out.println(s"Successfully loaded ")
    System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas? 

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
I don't control the code calling `System.loadLibrary("hadoop")` so that's not an option for me, unfortunately.

On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <[hidden email]> wrote:
This may be caused by a  jvm process can only load a so once.So a triky way is to rename it。

发自我的 iPhone

在 2019年1月25日,上午7:12,Aaron Levin <[hidden email]> 写道:

Hi Ufuk,

Update: I've pinned down the issue. It's multiple classloaders loading `libhadoop.so`:

```
failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so already loaded in another classloader
```

I'm not quite sure what the solution is. Ideally flink would destroy a classloader when a job is canceled, but perhaps there's a jvm limitation there? Putting the libraries into `/usr/lib` or `/lib` does not work (as suggested by Chesnay in the ticket) as I get the same error. I might see if I can put a jar with `org.apache.hadoop.common.io.compress` in `/flink/install/lib` and then remove it from my jar. It's not an ideal solution but I can't think of anything else.

Best,

Aaron Levin

On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff? 

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
    val library = args(0)
    System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
    System.out.println(s"Attempting to load $library")
    System.out.flush()
    System.loadLibrary(library)
    System.out.println(s"Successfully loaded ")
    System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas? 

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply.

(1) I think I was able to reproduce this issue using snappy-java. I've
filed a ticket here:
https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
ticket description whether it's in line with what you are
experiencing? Most importantly, do you see the same Exception being
reported after cancelling and re-starting the job?

(2) I don't think it's caused by the environment options not being
picked up. You can check the head of the log files of the JobManager
or TaskManager to verify that your provided option is picked up as
expected. You should see something similar to this:

2019-01-21 22:53:49,863 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------
2019-01-21 22:53:49,864 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
...
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
Options:
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xms1024m
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Xmx1024m
You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
2019-01-21 22:53:49,865 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
-Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
Program Arguments:
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--configDir
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
/.../flink-1.7.0/conf
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--executionMode
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
cluster
...
2019-01-21 22:53:49,866 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
--------------------------------------------------------------------------------

Can you verify that you see the log messages as expected?

(3) As noted FLINK-11402, is it possible to package the snappy library
as part of your user code instead of loading the library via
java.library.path? In my example, that seems to work fine.

– Ufuk

On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>
> Hello!
>
> *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>
> We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>
> ```
> env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> ```
>
> If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>
> The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>
> Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>
> About our setup:
>
> - Flink Version: 1.7.0
> - Deployment: Standalone in HA
> - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>
> Best,
>
> Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Ufuk Celebi
Hey Aaron,

sorry for the late reply (again).

(1) I think that your final result is in line with what I have
reproduced in https://issues.apache.org/jira/browse/FLINK-11402.

(2) I think renaming the file would not help as it will still be
loaded multiple times when the jobs restarts (as it happens in
FLINK-11402).

(3) I'll try to check whether Flink's shading of Hadoop is related to
this. I don't think so though. @Chesnay (cc'd): What do you think?

(4) @Aaron: Can you tell me which Hadoop libraries you use and share
some code so I can try to reproduce this exactly on my side? Judging
from the earlier stack traces you have shared, I'm assuming you are
trying to read Snappy-compressed sequence files.

– Ufuk

On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <[hidden email]> wrote:

>
> I don't control the code calling `System.loadLibrary("hadoop")` so that's not an option for me, unfortunately.
>
> On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <[hidden email]> wrote:
>>
>> This may be caused by a  jvm process can only load a so once.So a triky way is to rename it。
>>
>> 发自我的 iPhone
>>
>> 在 2019年1月25日,上午7:12,Aaron Levin <[hidden email]> 写道:
>>
>> Hi Ufuk,
>>
>> Update: I've pinned down the issue. It's multiple classloaders loading `libhadoop.so`:
>>
>> ```
>> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so already loaded in another classloader
>> ```
>>
>> I'm not quite sure what the solution is. Ideally flink would destroy a classloader when a job is canceled, but perhaps there's a jvm limitation there? Putting the libraries into `/usr/lib` or `/lib` does not work (as suggested by Chesnay in the ticket) as I get the same error. I might see if I can put a jar with `org.apache.hadoop.common.io.compress` in `/flink/install/lib` and then remove it from my jar. It's not an ideal solution but I can't think of anything else.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <[hidden email]> wrote:
>>>
>>> Hi Ufuk,
>>>
>>> I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff?
>>>
>>> Here is my program and its output:
>>>
>>> ```
>>> $ cat LibTest.scala
>>> package com.redacted.flink
>>>
>>> object LibTest {
>>>   def main(args: Array[String]): Unit = {
>>>     val library = args(0)
>>>     System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>>>     System.out.println(s"Attempting to load $library")
>>>     System.out.flush()
>>>     System.loadLibrary(library)
>>>     System.out.println(s"Successfully loaded ")
>>>     System.out.flush()
>>> }
>>> ```
>>>
>>> I then tried running that on one of the task managers with `hadoop` as an argument:
>>>
>>> ```
>>> $ java -jar lib_test_deploy.jar hadoop
>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>> Attempting to load hadoop
>>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>> at java.lang.System.loadLibrary(System.java:1122)
>>> at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>>> at com.stripe.flink.LibTest.main(LibTest.scala)
>>> ```
>>>
>>> I then copied the native libraries into `/usr/lib/` and ran it again:
>>>
>>> ```
>>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>>> $ java -jar lib_test_deploy.jar hadoop
>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>> Attempting to load hadoop
>>> Successfully loaded
>>> ```
>>>
>>> Any ideas?
>>>
>>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
>>>>
>>>> Hi Ufuk,
>>>>
>>>> One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).
>>>>
>>>> Best,
>>>>
>>>> Aaron Levin
>>>>
>>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> Two updates:
>>>>>
>>>>> 1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>>>> 2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.
>>>>>
>>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>>>>> [2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>>>>> [2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>>>>> [2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>>>>> [2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>>>>> [2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>>>>> [2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>>>>> [2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
>>>>> ... (redacted) ...
>>>>> [2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
>>>>> ... (redacted) ...
>>>>> [2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>>>>> [2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>> [2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>>>>> [2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> [2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
>>>>>>
>>>>>> Hey Ufuk,
>>>>>>
>>>>>> So, I looked into this a little bit:
>>>>>>
>>>>>> 1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
>>>>>> 2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
>>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.
>>>>>>
>>>>>> Thanks again for your help!
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Aaron Levin
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Aaron Levin
>>>>>>>
>>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hey Aaron,
>>>>>>>>
>>>>>>>> sorry for the late reply.
>>>>>>>>
>>>>>>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>>>>>>>> filed a ticket here:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>>>>>>>> ticket description whether it's in line with what you are
>>>>>>>> experiencing? Most importantly, do you see the same Exception being
>>>>>>>> reported after cancelling and re-starting the job?
>>>>>>>>
>>>>>>>> (2) I don't think it's caused by the environment options not being
>>>>>>>> picked up. You can check the head of the log files of the JobManager
>>>>>>>> or TaskManager to verify that your provided option is picked up as
>>>>>>>> expected. You should see something similar to this:
>>>>>>>>
>>>>>>>> 2019-01-21 22:53:49,863 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --------------------------------------------------------------------------------
>>>>>>>> 2019-01-21 22:53:49,864 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>>>>>>> ...
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
>>>>>>>> Options:
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Xms1024m
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Xmx1024m
>>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
>>>>>>>> ...
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> Program Arguments:
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --configDir
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> /.../flink-1.7.0/conf
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --executionMode
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> cluster
>>>>>>>> ...
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --------------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> Can you verify that you see the log messages as expected?
>>>>>>>>
>>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy library
>>>>>>>> as part of your user code instead of loading the library via
>>>>>>>> java.library.path? In my example, that seems to work fine.
>>>>>>>>
>>>>>>>> – Ufuk
>>>>>>>>
>>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>>>>>>>> >
>>>>>>>> > Hello!
>>>>>>>> >
>>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>>>>>>>> >
>>>>>>>> > We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>>>>>>>> >
>>>>>>>> > ```
>>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
>>>>>>>> > ```
>>>>>>>> >
>>>>>>>> > If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>>>>>>>> >
>>>>>>>> > The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>>>>>>>> >
>>>>>>>> > Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>>>>>>>> >
>>>>>>>> > About our setup:
>>>>>>>> >
>>>>>>>> > - Flink Version: 1.7.0
>>>>>>>> > - Deployment: Standalone in HA
>>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>>>>>>>> >
>>>>>>>> > Best,
>>>>>>>> >
>>>>>>>> > Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Aaron Levin
Hi Ufuk,

I'll answer your question, but first I'll give you an update on how we resolved the issue:

* adding `org.apache.hadoop.io.compress.SnappyCodec` to `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
* putting a jar with `hadoop-common` + it's transitive dependencies, then using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its transitive dependencies). So we end up with jar that has `SnappyCodec` and whatever it needs to call transitively. We put this jar on the task manager classpath.

I believe `SnappyCodec` was being called via our code. This worked the first time but deploying a second time caused `libhadoop.so` to be loaded in a second class loader. By putting a jar with `SnappyCodec` and it's transitive dependencies on the task manager classpath and specifying that `SnappyCodec` needs to be loaded from the parent classloader, we ensure that only one classloader loads `libhadoop.so`. I don't think this is the best way to achieve what we want, but it works for now.

Next steps: if no one is on it, I can take a stab at updating the documentation to clarify how to debug and resolve Native library loading. This was a nice learning experience and I think it'll be helpful to have this in the docs for those who aren't well-versed in how classloading on the JVM works!

To answer your questions:

1. We install hadoop on our machines and tell flink task managers to access it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in `flink-conf.yaml` 
2. We put flink's shaded hadoop-fs-s3 on both the task manager and job manager classpath (I believe this is only used by the Job Managers when they interact with S3 for checkpoints etc. I don't believe any user code is using this).
3. Our flink applications consist of a "fat jar" that has some `org.apache.hadoop` dependencies bundled with it. I believe this is the source of why we're loading `SnappyCodec` twice and triggering this issue.
4. For example code: we have a small wrapper around `org.apache.flink.api.common.io.FileInputFormat` which does the work with sequence files. It looks like (after removing some stuff to make it more clear):

```
abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable](
    typeInformation: TypeInformation[T]
) extends FileInputFormat[T]
    with ResultTypeQueryable[T] {
  @transient private var bufferedNextRecord: T = _
  @transient private var hadoopStream: HadoopFSDataInputStream = _
  @transient private var sequenceFileReader: SequenceFile.Reader = _

  unsplittable = true
  enumerateNestedFiles = true
 
  // *****************************************
  // This is where we'd see exceptions.
  // *****************************************
  override def open(fileSplit: FileInputSplit): Unit = {
    super.open(fileSplit)
    val config = new Configuration()
    hadoopStream = WrappedHadoopInputStream.wrap(stream)
    sequenceFileReader = new SequenceFile.Reader(config, SequenceFile.Reader.stream(hadoopStream))
    bufferNextRecord()
  }
...
}

// AND

class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
    extends InputStream
    with Seekable
    with PositionedReadable {

  def read(): Int = underlying.read()
  def seek(pos: Long): Unit = underlying.seek(pos)
  def getPos: Long = underlying.getPos
}
...
```

Thanks for all your help, I appreciate it! I wouldn't have been able to debug and resolve this if it wasn't for you filing the ticket. Thank you so much!


Aaron Levin

On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi <[hidden email]> wrote:
Hey Aaron,

sorry for the late reply (again).

(1) I think that your final result is in line with what I have
reproduced in https://issues.apache.org/jira/browse/FLINK-11402.

(2) I think renaming the file would not help as it will still be
loaded multiple times when the jobs restarts (as it happens in
FLINK-11402).

(3) I'll try to check whether Flink's shading of Hadoop is related to
this. I don't think so though. @Chesnay (cc'd): What do you think?

(4) @Aaron: Can you tell me which Hadoop libraries you use and share
some code so I can try to reproduce this exactly on my side? Judging
from the earlier stack traces you have shared, I'm assuming you are
trying to read Snappy-compressed sequence files.

– Ufuk

On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <[hidden email]> wrote:
>
> I don't control the code calling `System.loadLibrary("hadoop")` so that's not an option for me, unfortunately.
>
> On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <[hidden email]> wrote:
>>
>> This may be caused by a  jvm process can only load a so once.So a triky way is to rename it。
>>
>> 发自我的 iPhone
>>
>> 在 2019年1月25日,上午7:12,Aaron Levin <[hidden email]> 写道:
>>
>> Hi Ufuk,
>>
>> Update: I've pinned down the issue. It's multiple classloaders loading `libhadoop.so`:
>>
>> ```
>> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so already loaded in another classloader
>> ```
>>
>> I'm not quite sure what the solution is. Ideally flink would destroy a classloader when a job is canceled, but perhaps there's a jvm limitation there? Putting the libraries into `/usr/lib` or `/lib` does not work (as suggested by Chesnay in the ticket) as I get the same error. I might see if I can put a jar with `org.apache.hadoop.common.io.compress` in `/flink/install/lib` and then remove it from my jar. It's not an ideal solution but I can't think of anything else.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <[hidden email]> wrote:
>>>
>>> Hi Ufuk,
>>>
>>> I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff?
>>>
>>> Here is my program and its output:
>>>
>>> ```
>>> $ cat LibTest.scala
>>> package com.redacted.flink
>>>
>>> object LibTest {
>>>   def main(args: Array[String]): Unit = {
>>>     val library = args(0)
>>>     System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>>>     System.out.println(s"Attempting to load $library")
>>>     System.out.flush()
>>>     System.loadLibrary(library)
>>>     System.out.println(s"Successfully loaded ")
>>>     System.out.flush()
>>> }
>>> ```
>>>
>>> I then tried running that on one of the task managers with `hadoop` as an argument:
>>>
>>> ```
>>> $ java -jar lib_test_deploy.jar hadoop
>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>> Attempting to load hadoop
>>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>> at java.lang.System.loadLibrary(System.java:1122)
>>> at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>>> at com.stripe.flink.LibTest.main(LibTest.scala)
>>> ```
>>>
>>> I then copied the native libraries into `/usr/lib/` and ran it again:
>>>
>>> ```
>>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>>> $ java -jar lib_test_deploy.jar hadoop
>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>> Attempting to load hadoop
>>> Successfully loaded
>>> ```
>>>
>>> Any ideas?
>>>
>>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
>>>>
>>>> Hi Ufuk,
>>>>
>>>> One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).
>>>>
>>>> Best,
>>>>
>>>> Aaron Levin
>>>>
>>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> Two updates:
>>>>>
>>>>> 1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>>>>> 2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.
>>>>>
>>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>>>>> [2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>>>>> [2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>>>>> [2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>>>>> [2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>>>>> [2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>>>>> [2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>>>>> [2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
>>>>> ... (redacted) ...
>>>>> [2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
>>>>> ... (redacted) ...
>>>>> [2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>>>>> [2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>> [2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>>>>> [2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>> [2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
>>>>>>
>>>>>> Hey Ufuk,
>>>>>>
>>>>>> So, I looked into this a little bit:
>>>>>>
>>>>>> 1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
>>>>>> 2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
>>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.
>>>>>>
>>>>>> Thanks again for your help!
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Aaron Levin
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Aaron Levin
>>>>>>>
>>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hey Aaron,
>>>>>>>>
>>>>>>>> sorry for the late reply.
>>>>>>>>
>>>>>>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>>>>>>>> filed a ticket here:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>>>>>>>> ticket description whether it's in line with what you are
>>>>>>>> experiencing? Most importantly, do you see the same Exception being
>>>>>>>> reported after cancelling and re-starting the job?
>>>>>>>>
>>>>>>>> (2) I don't think it's caused by the environment options not being
>>>>>>>> picked up. You can check the head of the log files of the JobManager
>>>>>>>> or TaskManager to verify that your provided option is picked up as
>>>>>>>> expected. You should see something similar to this:
>>>>>>>>
>>>>>>>> 2019-01-21 22:53:49,863 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --------------------------------------------------------------------------------
>>>>>>>> 2019-01-21 22:53:49,864 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>>>>>>> ...
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
>>>>>>>> Options:
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Xms1024m
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Xmx1024m
>>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
>>>>>>>> 2019-01-21 22:53:49,865 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
>>>>>>>> ...
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> Program Arguments:
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --configDir
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> /.../flink-1.7.0/conf
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --executionMode
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> cluster
>>>>>>>> ...
>>>>>>>> 2019-01-21 22:53:49,866 INFO
>>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>>>>>>>> --------------------------------------------------------------------------------
>>>>>>>>
>>>>>>>> Can you verify that you see the log messages as expected?
>>>>>>>>
>>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy library
>>>>>>>> as part of your user code instead of loading the library via
>>>>>>>> java.library.path? In my example, that seems to work fine.
>>>>>>>>
>>>>>>>> – Ufuk
>>>>>>>>
>>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>>>>>>>> >
>>>>>>>> > Hello!
>>>>>>>> >
>>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>>>>>>>> >
>>>>>>>> > We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>>>>>>>> >
>>>>>>>> > ```
>>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
>>>>>>>> > ```
>>>>>>>> >
>>>>>>>> > If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>>>>>>>> >
>>>>>>>> > The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>>>>>>>> >
>>>>>>>> > Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>>>>>>>> >
>>>>>>>> > About our setup:
>>>>>>>> >
>>>>>>>> > - Flink Version: 1.7.0
>>>>>>>> > - Deployment: Standalone in HA
>>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>>>>>>>> >
>>>>>>>> > Best,
>>>>>>>> >
>>>>>>>> > Aaron Levin
Reply | Threaded
Open this post in threaded view
|

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

Ufuk Celebi
Hey Aaron,

I'm glad to hear that you resolved the issue.

I think a docs contribution for this would be very helpful and could
update this page:
https://github.com/apache/flink/blob/master/docs/monitoring/debugging_classloading.md.

If you want to create a separate JIRA ticket for this, ping me with
your JIRA username and I'll add you to the list of contributors (which
gives you permissions to create tickets).

I'll think a bit more about the other points you mentioned and get
back to you if I have another idea.

Best,

Ufuk

On Tue, Jan 29, 2019 at 10:48 PM Aaron Levin <[hidden email]> wrote:

>
> Hi Ufuk,
>
> I'll answer your question, but first I'll give you an update on how we resolved the issue:
>
> * adding `org.apache.hadoop.io.compress.SnappyCodec` to `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
> * putting a jar with `hadoop-common` + it's transitive dependencies, then using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its transitive dependencies). So we end up with jar that has `SnappyCodec` and whatever it needs to call transitively. We put this jar on the task manager classpath.
>
> I believe `SnappyCodec` was being called via our code. This worked the first time but deploying a second time caused `libhadoop.so` to be loaded in a second class loader. By putting a jar with `SnappyCodec` and it's transitive dependencies on the task manager classpath and specifying that `SnappyCodec` needs to be loaded from the parent classloader, we ensure that only one classloader loads `libhadoop.so`. I don't think this is the best way to achieve what we want, but it works for now.
>
> Next steps: if no one is on it, I can take a stab at updating the documentation to clarify how to debug and resolve Native library loading. This was a nice learning experience and I think it'll be helpful to have this in the docs for those who aren't well-versed in how classloading on the JVM works!
>
> To answer your questions:
>
> 1. We install hadoop on our machines and tell flink task managers to access it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in `flink-conf.yaml`
> 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job manager classpath (I believe this is only used by the Job Managers when they interact with S3 for checkpoints etc. I don't believe any user code is using this).
> 3. Our flink applications consist of a "fat jar" that has some `org.apache.hadoop` dependencies bundled with it. I believe this is the source of why we're loading `SnappyCodec` twice and triggering this issue.
> 4. For example code: we have a small wrapper around `org.apache.flink.api.common.io.FileInputFormat` which does the work with sequence files. It looks like (after removing some stuff to make it more clear):
>
> ```
> abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable](
>     typeInformation: TypeInformation[T]
> ) extends FileInputFormat[T]
>     with ResultTypeQueryable[T] {
>   @transient private var bufferedNextRecord: T = _
>   @transient private var hadoopStream: HadoopFSDataInputStream = _
>   @transient private var sequenceFileReader: SequenceFile.Reader = _
>
>   unsplittable = true
>   enumerateNestedFiles = true
>
>   // *****************************************
>   // This is where we'd see exceptions.
>   // *****************************************
>   override def open(fileSplit: FileInputSplit): Unit = {
>     super.open(fileSplit)
>     val config = new Configuration()
>     hadoopStream = WrappedHadoopInputStream.wrap(stream)
>     sequenceFileReader = new SequenceFile.Reader(config, SequenceFile.Reader.stream(hadoopStream))
>     bufferNextRecord()
>   }
> ...
> }
>
> // AND
>
> class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
>     extends InputStream
>     with Seekable
>     with PositionedReadable {
>
>   def read(): Int = underlying.read()
>   def seek(pos: Long): Unit = underlying.seek(pos)
>   def getPos: Long = underlying.getPos
> }
> ...
> ```
>
> Thanks for all your help, I appreciate it! I wouldn't have been able to debug and resolve this if it wasn't for you filing the ticket. Thank you so much!
>
> [0] https://github.com/pantsbuild/jarjar
>
> Aaron Levin
>
> On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Aaron,
>>
>> sorry for the late reply (again).
>>
>> (1) I think that your final result is in line with what I have
>> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>>
>> (2) I think renaming the file would not help as it will still be
>> loaded multiple times when the jobs restarts (as it happens in
>> FLINK-11402).
>>
>> (3) I'll try to check whether Flink's shading of Hadoop is related to
>> this. I don't think so though. @Chesnay (cc'd): What do you think?
>>
>> (4) @Aaron: Can you tell me which Hadoop libraries you use and share
>> some code so I can try to reproduce this exactly on my side? Judging
>> from the earlier stack traces you have shared, I'm assuming you are
>> trying to read Snappy-compressed sequence files.
>>
>> – Ufuk
>>
>> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <[hidden email]> wrote:
>> >
>> > I don't control the code calling `System.loadLibrary("hadoop")` so that's not an option for me, unfortunately.
>> >
>> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <[hidden email]> wrote:
>> >>
>> >> This may be caused by a  jvm process can only load a so once.So a triky way is to rename it。
>> >>
>> >> 发自我的 iPhone
>> >>
>> >> 在 2019年1月25日,上午7:12,Aaron Levin <[hidden email]> 写道:
>> >>
>> >> Hi Ufuk,
>> >>
>> >> Update: I've pinned down the issue. It's multiple classloaders loading `libhadoop.so`:
>> >>
>> >> ```
>> >> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so already loaded in another classloader
>> >> ```
>> >>
>> >> I'm not quite sure what the solution is. Ideally flink would destroy a classloader when a job is canceled, but perhaps there's a jvm limitation there? Putting the libraries into `/usr/lib` or `/lib` does not work (as suggested by Chesnay in the ticket) as I get the same error. I might see if I can put a jar with `org.apache.hadoop.common.io.compress` in `/flink/install/lib` and then remove it from my jar. It's not an ideal solution but I can't think of anything else.
>> >>
>> >> Best,
>> >>
>> >> Aaron Levin
>> >>
>> >> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <[hidden email]> wrote:
>> >>>
>> >>> Hi Ufuk,
>> >>>
>> >>> I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff?
>> >>>
>> >>> Here is my program and its output:
>> >>>
>> >>> ```
>> >>> $ cat LibTest.scala
>> >>> package com.redacted.flink
>> >>>
>> >>> object LibTest {
>> >>>   def main(args: Array[String]): Unit = {
>> >>>     val library = args(0)
>> >>>     System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>> >>>     System.out.println(s"Attempting to load $library")
>> >>>     System.out.flush()
>> >>>     System.loadLibrary(library)
>> >>>     System.out.println(s"Successfully loaded ")
>> >>>     System.out.flush()
>> >>> }
>> >>> ```
>> >>>
>> >>> I then tried running that on one of the task managers with `hadoop` as an argument:
>> >>>
>> >>> ```
>> >>> $ java -jar lib_test_deploy.jar hadoop
>> >>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> >>> Attempting to load hadoop
>> >>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>> >>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>> >>> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>> >>> at java.lang.System.loadLibrary(System.java:1122)
>> >>> at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>> >>> at com.stripe.flink.LibTest.main(LibTest.scala)
>> >>> ```
>> >>>
>> >>> I then copied the native libraries into `/usr/lib/` and ran it again:
>> >>>
>> >>> ```
>> >>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>> >>> $ java -jar lib_test_deploy.jar hadoop
>> >>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> >>> Attempting to load hadoop
>> >>> Successfully loaded
>> >>> ```
>> >>>
>> >>> Any ideas?
>> >>>
>> >>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <[hidden email]> wrote:
>> >>>>
>> >>>> Hi Ufuk,
>> >>>>
>> >>>> One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly).
>> >>>>
>> >>>> Best,
>> >>>>
>> >>>> Aaron Levin
>> >>>>
>> >>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <[hidden email]> wrote:
>> >>>>>
>> >>>>> Hi Ufuk,
>> >>>>>
>> >>>>> Two updates:
>> >>>>>
>> >>>>> 1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> >>>>> 2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`.
>> >>>>>
>> >>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>> >>>>> [2019-01-23 19:52:33.081376]  at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>> >>>>> [2019-01-23 19:52:33.081406]  at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>> >>>>> [2019-01-23 19:52:33.081429]  at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>> >>>>> [2019-01-23 19:52:33.081457]  at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>> >>>>> [2019-01-23 19:52:33.081494]  at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>> >>>>> [2019-01-23 19:52:33.081517]  at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>> >>>>> [2019-01-23 19:52:33.081549]  at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
>> >>>>> ... (redacted) ...
>> >>>>> [2019-01-23 19:52:33.081728]  at scala.collection.immutable.List.foreach(List.scala:392)
>> >>>>> ... (redacted) ...
>> >>>>> [2019-01-23 19:52:33.081832]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>> >>>>> [2019-01-23 19:52:33.081854]  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> >>>>> [2019-01-23 19:52:33.081882]  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>> >>>>> [2019-01-23 19:52:33.081904]  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> >>>>> [2019-01-23 19:52:33.081946]  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> >>>>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>> >>>>>
>> >>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <[hidden email]> wrote:
>> >>>>>>
>> >>>>>> Hey Ufuk,
>> >>>>>>
>> >>>>>> So, I looked into this a little bit:
>> >>>>>>
>> >>>>>> 1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries.
>> >>>>>> 2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon).
>> >>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem.
>> >>>>>>
>> >>>>>> Thanks again for your help!
>> >>>>>>
>> >>>>>> Best,
>> >>>>>>
>> >>>>>> Aaron Levin
>> >>>>>>
>> >>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <[hidden email]> wrote:
>> >>>>>>>
>> >>>>>>> Hey,
>> >>>>>>>
>> >>>>>>> Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back.
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>>
>> >>>>>>> Aaron Levin
>> >>>>>>>
>> >>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <[hidden email]> wrote:
>> >>>>>>>>
>> >>>>>>>> Hey Aaron,
>> >>>>>>>>
>> >>>>>>>> sorry for the late reply.
>> >>>>>>>>
>> >>>>>>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>> >>>>>>>> filed a ticket here:
>> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>> >>>>>>>> ticket description whether it's in line with what you are
>> >>>>>>>> experiencing? Most importantly, do you see the same Exception being
>> >>>>>>>> reported after cancelling and re-starting the job?
>> >>>>>>>>
>> >>>>>>>> (2) I don't think it's caused by the environment options not being
>> >>>>>>>> picked up. You can check the head of the log files of the JobManager
>> >>>>>>>> or TaskManager to verify that your provided option is picked up as
>> >>>>>>>> expected. You should see something similar to this:
>> >>>>>>>>
>> >>>>>>>> 2019-01-21 22:53:49,863 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --------------------------------------------------------------------------------
>> >>>>>>>> 2019-01-21 22:53:49,864 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>> >>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>> >>>>>>>> ...
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
>> >>>>>>>> Options:
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Xms1024m
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Xmx1024m
>> >>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
>> >>>>>>>> ...
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> Program Arguments:
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --configDir
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> /.../flink-1.7.0/conf
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --executionMode
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> cluster
>> >>>>>>>> ...
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --------------------------------------------------------------------------------
>> >>>>>>>>
>> >>>>>>>> Can you verify that you see the log messages as expected?
>> >>>>>>>>
>> >>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy library
>> >>>>>>>> as part of your user code instead of loading the library via
>> >>>>>>>> java.library.path? In my example, that seems to work fine.
>> >>>>>>>>
>> >>>>>>>> – Ufuk
>> >>>>>>>>
>> >>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <[hidden email]> wrote:
>> >>>>>>>> >
>> >>>>>>>> > Hello!
>> >>>>>>>> >
>> >>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below.
>> >>>>>>>> >
>> >>>>>>>> > We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put:
>> >>>>>>>> >
>> >>>>>>>> > ```
>> >>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
>> >>>>>>>> > ```
>> >>>>>>>> >
>> >>>>>>>> > If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures.
>> >>>>>>>> >
>> >>>>>>>> > The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason.
>> >>>>>>>> >
>> >>>>>>>> > Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help!
>> >>>>>>>> >
>> >>>>>>>> > About our setup:
>> >>>>>>>> >
>> >>>>>>>> > - Flink Version: 1.7.0
>> >>>>>>>> > - Deployment: Standalone in HA
>> >>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink.
>> >>>>>>>> >
>> >>>>>>>> > Best,
>> >>>>>>>> >
>> >>>>>>>> > Aaron Levin