Class loading issues when using Remote Execution Environment

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Class loading issues when using Remote Execution Environment

kedar mhaswade
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....

Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

Chesnay Schepler
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?

On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....


Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

kedar mhaswade
Thank you for your response!

I have not tried the flink run app.jar route because the way the app is set up does not allow me to do it. Basically, the app is a web application which serves the UI and also submits a Flink job for running Cypher queries. It is a proof-of-concept app, but IMO, a very useful one. 

Here's how you can reproduce:
1) git clone [hidden email]:kedarmhaswade/gradoop_demo.git (this is my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this app needs; these JARs should then be placed in <flink-install>/lib.
5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar <flink-install>/lib, cp ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
6) start the local flink cluster (I have tried with latest (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh -- note the JM host and port
7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host and port per your cluster) => this is now configured to talk to the RemoteEnvironment at given JM host and port.
9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your willingness to try. I have tried everything I can. With different Flink versions, I get different results (I have also tried on 1.6-SNAPSHOT with class loading config being parent-first, or child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <[hidden email]> wrote:
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?


On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....



Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

Chesnay Schepler
Small update:

I could reproduce your problems locally when submitting the fat-jar.
I could get the job to run after placing the gradoop-demo-shaded.jar into the lib folder.
I have not tried yet placing only the gradoop jars into lib (but my guess is you missed a gradoop jar)

Note that the job fails to run since you use "LocalCollectionOutputFormat" which can only be used for local execution, i.e. when the job submission and execution happen in the same JVM.

On 25.04.2018 14:23, kedar mhaswade wrote:
Thank you for your response!

I have not tried the flink run app.jar route because the way the app is set up does not allow me to do it. Basically, the app is a web application which serves the UI and also submits a Flink job for running Cypher queries. It is a proof-of-concept app, but IMO, a very useful one. 

Here's how you can reproduce:
1) git clone [hidden email] (this is my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this app needs; these JARs should then be placed in <flink-install>/lib.
5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar <flink-install>/lib, cp ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
6) start the local flink cluster (I have tried with latest (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh -- note the JM host and port
7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host and port per your cluster) => this is now configured to talk to the RemoteEnvironment at given JM host and port.
9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp target/<a class="moz-txt-link-freetext" href="classes:target/gradoop-demo-shaded.jar">classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your willingness to try. I have tried everything I can. With different Flink versions, I get different results (I have also tried on 1.6-SNAPSHOT with class loading config being parent-first, or child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <[hidden email]> wrote:
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?


On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....




Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

kedar mhaswade
Thanks Chesnay for your incredible help! 

I will try out the suggestions again. A few questions:
- What version of Flink are you trying with? I have had issues when I placed the gradoop-demo-shaded.jar in the lib folder on Flink installation (1.4 even refused to start!).
- Are there other config changes (flink-conf.yaml) that you made in your cluster?
- Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat, or should I use HadoopOutputFormatCommonBase (I do want to run the cluster on YARN later; at the moment I am trying on a standalone cluster).
- Do you think it is better to use jarFiles argument on createRemoteEnvironment (which deploys the JAR only for this job and not mess with the entire Flink cluster) a better option than placing the JAR(s) in the lib folder?

Thanks again,
Regards,
Kedar


On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <[hidden email]> wrote:
Small update:

I could reproduce your problems locally when submitting the fat-jar.
I could get the job to run after placing the gradoop-demo-shaded.jar into the lib folder.
I have not tried yet placing only the gradoop jars into lib (but my guess is you missed a gradoop jar)

Note that the job fails to run since you use "LocalCollectionOutputFormat" which can only be used for local execution, i.e. when the job submission and execution happen in the same JVM.


On 25.04.2018 14:23, kedar mhaswade wrote:
Thank you for your response!

I have not tried the flink run app.jar route because the way the app is set up does not allow me to do it. Basically, the app is a web application which serves the UI and also submits a Flink job for running Cypher queries. It is a proof-of-concept app, but IMO, a very useful one. 

Here's how you can reproduce:
1) git clone [hidden email] (this is my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this app needs; these JARs should then be placed in <flink-install>/lib.
5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar <flink-install>/lib, cp ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
6) start the local flink cluster (I have tried with latest (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh -- note the JM host and port
7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host and port per your cluster) => this is now configured to talk to the RemoteEnvironment at given JM host and port.
9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your willingness to try. I have tried everything I can. With different Flink versions, I get different results (I have also tried on 1.6-SNAPSHOT with class loading config being parent-first, or child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <[hidden email]> wrote:
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?


On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....





Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

Chesnay Schepler
First, a small correction for my previous mail:

I could reproduce your problems locally when submitting the fat-jar.
Turns out i never submitted the far-jar, as i didn't pass the jar file argument to RemoteEnvironment.

Now on to your questions:

What version of Flink are you trying with?
I got it working once with 1.6-SNAPSHOT, but i would recommend sticking with 1.3.1 since that is the version gradoop depends on. (i haven't tried it with this version yet, but that's the next thing on my list)

Are there other config changes (flink-conf.yaml) that you made in your cluster?
It was the standard config.

Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat?
It can be used, but if the result is small you could also use accumulators.

Do you think it is better to use jarFiles argument on createRemoteEnvironment?
Yes, once we get it working this is the way to go.

On 26.04.2018 18:42, kedar mhaswade wrote:
Thanks Chesnay for your incredible help! 

I will try out the suggestions again. A few questions:
- What version of Flink are you trying with? I have had issues when I placed the gradoop-demo-shaded.jar in the lib folder on Flink installation (1.4 even refused to start!).
- Are there other config changes (flink-conf.yaml) that you made in your cluster?
- Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat, or should I use HadoopOutputFormatCommonBase (I do want to run the cluster on YARN later; at the moment I am trying on a standalone cluster).
- Do you think it is better to use jarFiles argument on createRemoteEnvironment (which deploys the JAR only for this job and not mess with the entire Flink cluster) a better option than placing the JAR(s) in the lib folder?

Thanks again,
Regards,
Kedar


On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <[hidden email]> wrote:
Small update:

I could reproduce your problems locally when submitting the fat-jar.
I could get the job to run after placing the gradoop-demo-shaded.jar into the lib folder.
I have not tried yet placing only the gradoop jars into lib (but my guess is you missed a gradoop jar)

Note that the job fails to run since you use "LocalCollectionOutputFormat" which can only be used for local execution, i.e. when the job submission and execution happen in the same JVM.


On 25.04.2018 14:23, kedar mhaswade wrote:
Thank you for your response!

I have not tried the flink run app.jar route because the way the app is set up does not allow me to do it. Basically, the app is a web application which serves the UI and also submits a Flink job for running Cypher queries. It is a proof-of-concept app, but IMO, a very useful one. 

Here's how you can reproduce:
1) git clone [hidden email] (this is my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this app needs; these JARs should then be placed in <flink-install>/lib.
5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar <flink-install>/lib, cp ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
6) start the local flink cluster (I have tried with latest (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh -- note the JM host and port
7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host and port per your cluster) => this is now configured to talk to the RemoteEnvironment at given JM host and port.
9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your willingness to try. I have tried everything I can. With different Flink versions, I get different results (I have also tried on 1.6-SNAPSHOT with class loading config being parent-first, or child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <[hidden email]> wrote:
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?


On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....






Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

kedar mhaswade
Thanks again!

This is strange. With both Flink 1.3.3 and Flink 1.6.0-SNAPSHOT and 
1) copying gradoop-demo-shaded.jar to <Flink>/lib, and
2) using RemoteEnvironment with just jmHost and jmPort (no Jarfiles) 

I get the same exception [1], caused by: 
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'.

This key is not documented anywhere, so I am confused. Also, copying with above, also JM and TM are running, the Flink dashboard on http://localhost:8081 is unavailable!

With Flink 1.3.3 and Flink 1.6.0-SNAPSHOT
1) NOT copying gradoop-shaded.jar in <Flink>/lib, and
2) using RemoteEnvironment with jmHost, jmPort and jarFiles = {<absolute-path-to-gradoop-shaded.jar>}

I get the same exception, however the Flink dashboard on http://localhost:8081 is available! This makes me believe that this is somehow an insidious classloading issue :(.
I am really perplexed with this behavior. Let me stick to Flink 1.3.3 installation as you suggested for now.

If you have any other debugging tips, please let me know. But I am running out of ideas to make it run with non-Local Environment.

Regards,
Kedar




[1] Gradoop shaded jar in <Flink>/lib -- exception on the web-app:
org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
at org.gradoop.demo.server.RequestHandler.getResponse(RequestHandler.java:447)
at org.gradoop.demo.server.RequestHandler.createResponse(RequestHandler.java:430)
at org.gradoop.demo.server.RequestHandler.executeCypher(RequestHandler.java:121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer._service(GrizzlyContainer.java:222)
at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service(GrizzlyContainer.java:192)
at org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at org.glassfish.grizzly.http.server.HttpHandlerChain.service(HttpHandlerChain.java:196)
at org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at org.glassfish.grizzly.http.server.HttpServerFilter.handleRead(HttpServerFilter.java:175)
at org.glassfish.grizzly.filterchain.ExecutorResolver$9.execute(ExecutorResolver.java:119)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:265)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:200)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:134)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:112)
at org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:78)
at org.glassfish.grizzly.nio.transport.TCPNIOTransport.fireIOEvent(TCPNIOTransport.java:815)
at org.glassfish.grizzly.strategies.AbstractIOStrategy.fireIOEvent(AbstractIOStrategy.java:112)
at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.run0(WorkerThreadIOStrategy.java:115)
at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.access$100(WorkerThreadIOStrategy.java:55)
at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy$WorkerThreadRunnable.run(WorkerThreadIOStrategy.java:135)
at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:567)
at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not start the ActorSystem lazily.
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:230)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:459)
... 47 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174)
at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:192)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:231)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:104)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:92)
at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:226)


On Thu, Apr 26, 2018 at 11:52 PM, Chesnay Schepler <[hidden email]> wrote:
First, a small correction for my previous mail:

I could reproduce your problems locally when submitting the fat-jar.
Turns out i never submitted the far-jar, as i didn't pass the jar file argument to RemoteEnvironment.

Now on to your questions:

What version of Flink are you trying with?
I got it working once with 1.6-SNAPSHOT, but i would recommend sticking with 1.3.1 since that is the version gradoop depends on. (i haven't tried it with this version yet, but that's the next thing on my list)

Are there other config changes (flink-conf.yaml) that you made in your cluster?
It was the standard config.

Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat?
It can be used, but if the result is small you could also use accumulators.

Do you think it is better to use jarFiles argument on createRemoteEnvironment?
Yes, once we get it working this is the way to go.


On 26.04.2018 18:42, kedar mhaswade wrote:
Thanks Chesnay for your incredible help! 

I will try out the suggestions again. A few questions:
- What version of Flink are you trying with? I have had issues when I placed the gradoop-demo-shaded.jar in the lib folder on Flink installation (1.4 even refused to start!).
- Are there other config changes (flink-conf.yaml) that you made in your cluster?
- Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat, or should I use HadoopOutputFormatCommonBase (I do want to run the cluster on YARN later; at the moment I am trying on a standalone cluster).
- Do you think it is better to use jarFiles argument on createRemoteEnvironment (which deploys the JAR only for this job and not mess with the entire Flink cluster) a better option than placing the JAR(s) in the lib folder?

Thanks again,
Regards,
Kedar


On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <[hidden email]> wrote:
Small update:

I could reproduce your problems locally when submitting the fat-jar.
I could get the job to run after placing the gradoop-demo-shaded.jar into the lib folder.
I have not tried yet placing only the gradoop jars into lib (but my guess is you missed a gradoop jar)

Note that the job fails to run since you use "LocalCollectionOutputFormat" which can only be used for local execution, i.e. when the job submission and execution happen in the same JVM.


On 25.04.2018 14:23, kedar mhaswade wrote:
Thank you for your response!

I have not tried the flink run app.jar route because the way the app is set up does not allow me to do it. Basically, the app is a web application which serves the UI and also submits a Flink job for running Cypher queries. It is a proof-of-concept app, but IMO, a very useful one. 

Here's how you can reproduce:
1) git clone [hidden email] (this is my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this app needs; these JARs should then be placed in <flink-install>/lib.
5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar <flink-install>/lib, cp ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
6) start the local flink cluster (I have tried with latest (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh -- note the JM host and port
7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host and port per your cluster) => this is now configured to talk to the RemoteEnvironment at given JM host and port.
9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your willingness to try. I have tried everything I can. With different Flink versions, I get different results (I have also tried on 1.6-SNAPSHOT with class loading config being parent-first, or child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <[hidden email]> wrote:
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?


On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....







Reply | Threaded
Open this post in threaded view
|

Re: Class loading issues when using Remote Execution Environment

kedar mhaswade
Chesnay,

I have filed https://issues.apache.org/jira/browse/FLINK-9267 to keep track of this issue.

Regards,
Kedar

On Fri, Apr 27, 2018 at 11:50 AM, kedar mhaswade <[hidden email]> wrote:
Thanks again!

This is strange. With both Flink 1.3.3 and Flink 1.6.0-SNAPSHOT and 
1) copying gradoop-demo-shaded.jar to <Flink>/lib, and
2) using RemoteEnvironment with just jmHost and jmPort (no Jarfiles) 

I get the same exception [1], caused by: 
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'.

This key is not documented anywhere, so I am confused. Also, copying with above, also JM and TM are running, the Flink dashboard on http://localhost:8081 is unavailable!

With Flink 1.3.3 and Flink 1.6.0-SNAPSHOT
1) NOT copying gradoop-shaded.jar in <Flink>/lib, and
2) using RemoteEnvironment with jmHost, jmPort and jarFiles = {<absolute-path-to-gradoop-shaded.jar>}

I get the same exception, however the Flink dashboard on http://localhost:8081 is available! This makes me believe that this is somehow an insidious classloading issue :(.
I am really perplexed with this behavior. Let me stick to Flink 1.3.3 installation as you suggested for now.

If you have any other debugging tips, please let me know. But I am running out of ideas to make it run with non-Local Environment.

Regards,
Kedar




[1] Gradoop shaded jar in <Flink>/lib -- exception on the web-app:
org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
at org.gradoop.demo.server.RequestHandler.getResponse(RequestHandler.java:447)
at org.gradoop.demo.server.RequestHandler.createResponse(RequestHandler.java:430)
at org.gradoop.demo.server.RequestHandler.executeCypher(RequestHandler.java:121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer._service(GrizzlyContainer.java:222)
at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service(GrizzlyContainer.java:192)
at org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at org.glassfish.grizzly.http.server.HttpHandlerChain.service(HttpHandlerChain.java:196)
at org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at org.glassfish.grizzly.http.server.HttpServerFilter.handleRead(HttpServerFilter.java:175)
at org.glassfish.grizzly.filterchain.ExecutorResolver$9.execute(ExecutorResolver.java:119)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:265)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:200)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:134)
at org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:112)
at org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:78)
at org.glassfish.grizzly.nio.transport.TCPNIOTransport.fireIOEvent(TCPNIOTransport.java:815)
at org.glassfish.grizzly.strategies.AbstractIOStrategy.fireIOEvent(AbstractIOStrategy.java:112)
at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.run0(WorkerThreadIOStrategy.java:115)
at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.access$100(WorkerThreadIOStrategy.java:55)
at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy$WorkerThreadRunnable.run(WorkerThreadIOStrategy.java:135)
at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:567)
at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not start the ActorSystem lazily.
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:230)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:459)
... 47 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174)
at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:192)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:231)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:104)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:92)
at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:226)


On Thu, Apr 26, 2018 at 11:52 PM, Chesnay Schepler <[hidden email]> wrote:
First, a small correction for my previous mail:

I could reproduce your problems locally when submitting the fat-jar.
Turns out i never submitted the far-jar, as i didn't pass the jar file argument to RemoteEnvironment.

Now on to your questions:

What version of Flink are you trying with?
I got it working once with 1.6-SNAPSHOT, but i would recommend sticking with 1.3.1 since that is the version gradoop depends on. (i haven't tried it with this version yet, but that's the next thing on my list)

Are there other config changes (flink-conf.yaml) that you made in your cluster?
It was the standard config.

Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat?
It can be used, but if the result is small you could also use accumulators.

Do you think it is better to use jarFiles argument on createRemoteEnvironment?
Yes, once we get it working this is the way to go.


On 26.04.2018 18:42, kedar mhaswade wrote:
Thanks Chesnay for your incredible help! 

I will try out the suggestions again. A few questions:
- What version of Flink are you trying with? I have had issues when I placed the gradoop-demo-shaded.jar in the lib folder on Flink installation (1.4 even refused to start!).
- Are there other config changes (flink-conf.yaml) that you made in your cluster?
- Is org.apache.flink.api.common.io.FileOutputFormat a good alternative to LocalCollectionOutputFormat, or should I use HadoopOutputFormatCommonBase (I do want to run the cluster on YARN later; at the moment I am trying on a standalone cluster).
- Do you think it is better to use jarFiles argument on createRemoteEnvironment (which deploys the JAR only for this job and not mess with the entire Flink cluster) a better option than placing the JAR(s) in the lib folder?

Thanks again,
Regards,
Kedar


On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <[hidden email]> wrote:
Small update:

I could reproduce your problems locally when submitting the fat-jar.
I could get the job to run after placing the gradoop-demo-shaded.jar into the lib folder.
I have not tried yet placing only the gradoop jars into lib (but my guess is you missed a gradoop jar)

Note that the job fails to run since you use "LocalCollectionOutputFormat" which can only be used for local execution, i.e. when the job submission and execution happen in the same JVM.


On 25.04.2018 14:23, kedar mhaswade wrote:
Thank you for your response!

I have not tried the flink run app.jar route because the way the app is set up does not allow me to do it. Basically, the app is a web application which serves the UI and also submits a Flink job for running Cypher queries. It is a proof-of-concept app, but IMO, a very useful one. 

Here's how you can reproduce:
1) git clone [hidden email] (this is my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this app needs; these JARs should then be placed in <flink-install>/lib.
5) cp ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar <flink-install>/lib, cp ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar <flink-install>/lib.
6) start the local flink cluster (I have tried with latest (built-from-source) 1.6-SNAPSHOT, or 1.4) <flink-install>/bin/start-cluster.sh -- note the JM host and port
7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host and port per your cluster) => this is now configured to talk to the RemoteEnvironment at given JM host and port.
9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp target/classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your willingness to try. I have tried everything I can. With different Flink versions, I get different results (I have also tried on 1.6-SNAPSHOT with class loading config being parent-first, or child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <[hidden email]> wrote:
I couldn't spot any error in what you tried to do. Does the job-submission succeed if you submit the jar through the command-line client?

Can you share the project, or a minimal reproducing version?


On 25.04.2018 00:41, kedar mhaswade wrote:
I am trying to get gradoop_demo (a gradoop based graph visualization app) working on Flink with Remote Execution Environment.

This app, which is based on Gradoop, submits a job to the preconfigured execution environment, collects the results and sends it to the UI for rendering.

When the execution environment is configured to be a LocalEnvironment, everything works fine. But when I start a cluster (using <flink-install-path>/bin/start-cluster.sh), get the Job Manager endpoint (e.g. localhost:6123) and configure a RemoteEnvironment and use that environment to run the job, I get exceptions [1].

Based on the class loading doc, I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib folder (hoping that that way those classes will be available to all the executors in the cluster). I have ensured that the class that Flink fails to load is in fact available in the Gradoop jars that I copied to the /lib folder.

I have tried using the RemoteEnvironment method with jarFiles argument where the passed JAR file is a fat jar containing everything (in which case there is no Gradoop JAR file in /lib folder).

So, my questions are:
1) How can I use RemoteEnvironment?
2) Is there any other way of doing this programmatically? (That means I can't do flink run since I am interested in the job execution result as a blocking call -- which means ideally I don't want to use the submit RESTful API as well). I just want RemoteEnvironment to work as well as LocalEnvironment.

Regards,
Kedar


[1]
2018-04-24 15:16:02,823 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
java.lang.NoClassDefFoundError: Could not initialize class org.gradoop.common.model.impl.id.GradoopId
  at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
  at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
  at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263)
  at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
  at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at java.util.HashSet.readObject(HashSet.java:341)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
  at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....