Akka configuration setting missing if RemoteEnvironment job is started from CLI

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Akka configuration setting missing if RemoteEnvironment job is started from CLI

Lukas Kircher-2
Good morning,

I have some problems using a Flink RemoteEnvironment in my Java application. If I run the following code [1] directly from the IDE it runs as expected. However, if I package a jar and run this from a CLI via `java -cp ...` I get the following error [2]. I want to use the RemoteEnvironment to run a sequence of Flink jobs that depend on each other in my application. I am using Flink 1.3.2.

Is there a difference between running this example from IDE vs CLI? Am I missing something? Did I forget to specify some configuration?

Thanks for your help,
Lukas



[1]
package flinkremote;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;

public class RemoteEnvironmentTest {

public static void main(final String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 6123,
"/tmp/flinkremote.jar");
env
.fromElements(1, 2, 3, 4, 5, 6)
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value == 4;
}
})
.writeAsText("/tmp/flinkremote.csv", FileSystem.WriteMode.OVERWRITE);
env.execute("remote environment test");
}
}

[2]
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at flinkremote.RemoteEnvironmentTest.main(RemoteEnvironmentTest.java:23)
Caused by: org.apache.flink.util.FlinkException: Could not start the ActorSystem lazily.
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:230)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:459)
... 8 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174)
at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:192)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:231)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:104)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:92)
at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:226)
... 9 more




Reply | Threaded
Open this post in threaded view
|

Re: Akka configuration setting missing if RemoteEnvironment job is started from CLI

Lukas Kircher-2
Hi again,

jfyi - not sure if this is correct, but it seems that differing flink jars seemed to be the problem. Here is how I get the RemoteEnvironment running on my setup:

* create a maven project with the flink-quickstart-java archetype
* no changes to pom.xml
* add my custom job to the project, see [1]
* build with `mvn clean install -Pbuild-jar`
* run with `java -cp /tmp/flinkremote2.jar:/usr/local/Cellar/apache-flink/1.3.2/libexec/lib/* flinkremote2.RemoteEnvironmentTest`

Including the jars of the deployed Flink installation avoids dependency mismatches. In the example code I point the remote environment to the local host. To point it to a remote machine I just make sure I have an identical Flink installation on the machine that calls `java -cp ...` (i.e. include the exact same jars in the classpath).



[1] 
package flinkremote2;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;

public class RemoteEnvironmentTest {

public static void main(final String[] args) throws Exception {
final String host = "127.0.0.1";
final String jar = "/tmp/flinkremote2.jar";
System.out.println(String.format("host: %s", host));
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, 6123, jar);
env
.fromElements(1, 2, 3, 4, 5, 6)
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value == 4;
}
})
.print();
}
}


On 16. Nov 2017, at 11:28, Lukas Kircher <[hidden email]> wrote:

Good morning,

I have some problems using a Flink RemoteEnvironment in my Java application. If I run the following code [1] directly from the IDE it runs as expected. However, if I package a jar and run this from a CLI via `java -cp ...` I get the following error [2]. I want to use the RemoteEnvironment to run a sequence of Flink jobs that depend on each other in my application. I am using Flink 1.3.2.

Is there a difference between running this example from IDE vs CLI? Am I missing something? Did I forget to specify some configuration?

Thanks for your help,
Lukas



[1]
package flinkremote;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;

public class RemoteEnvironmentTest {

public static void main(final String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 6123,
"/tmp/flinkremote.jar");
env
.fromElements(1, 2, 3, 4, 5, 6)
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value == 4;
}
})
.writeAsText("/tmp/flinkremote.csv", FileSystem.WriteMode.OVERWRITE);
env.execute("remote environment test");
}
}

[2]
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at flinkremote.RemoteEnvironmentTest.main(RemoteEnvironmentTest.java:23)
Caused by: org.apache.flink.util.FlinkException: Could not start the ActorSystem lazily.
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:230)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:459)
... 8 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174)
at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:192)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:231)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:104)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:92)
at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:226)
... 9 more