ArrayIndexOutOfBoundsException when running job from JAR

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

ArrayIndexOutOfBoundsException when running job from JAR

Mihail Vieru
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

rmetzger0
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Stephan Ewen
In reply to this post by Mihail Vieru
Looks like an exception in one of the Gelly functions.

Let's wait for someone from Gelly to jump in...

On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Mihail Vieru
In reply to this post by rmetzger0
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail

On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)


Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Vasiliki Kalavri
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)



Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Mihail Vieru
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

    @Override
    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

        VertexCentricConfiguration parameters = new VertexCentricConfiguration();
        parameters.setSolutionSetUnmanagedMemory(false);
       
        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
                        new MinDistanceMessenger<K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
                        maxIterations, parameters);
        }

I'll send you the full code via a private e-mail.

Cheers,
Mihail

On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)




Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Vasiliki Kalavri
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a static field of the enclosing class, inside an anonymous mapper. Making the mapper a standalone class and passing the array size to the constructor solved the issue.

What I don't understand though, is why this worked fine when the job was executed from inside the IDE. Is serialization handled differently (skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru <[hidden email]> wrote:
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

    @Override
    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

        VertexCentricConfiguration parameters = new VertexCentricConfiguration();
        parameters.setSolutionSetUnmanagedMemory(false);
       
        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
                        new MinDistanceMessenger<K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
                        maxIterations, parameters);
        }

I'll send you the full code via a private e-mail.

Cheers,
Mihail


On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)





Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

rmetzger0
It is working in the IDE because there we execute everything in the same JVM, so the mapper can access the correct value of the static variable.
When submitting a job with the CLI frontend, there are at least two JVMs involved, and code running in the JM/TM can not access the value from the static variable in the Cli frontend.

On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a static field of the enclosing class, inside an anonymous mapper. Making the mapper a standalone class and passing the array size to the constructor solved the issue.

What I don't understand though, is why this worked fine when the job was executed from inside the IDE. Is serialization handled differently (skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru <[hidden email]> wrote:
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

    @Override
    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

        VertexCentricConfiguration parameters = new VertexCentricConfiguration();
        parameters.setSolutionSetUnmanagedMemory(false);
       
        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
                        new MinDistanceMessenger<K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
                        maxIterations, parameters);
        }

I'll send you the full code via a private e-mail.

Cheers,
Mihail


On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)






Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Vasiliki Kalavri
Thank you for the answer Robert!

I realize it's a single JVM running, yet I would expect programs to behave in the same way, i.e. serialization to happen (even if not necessary), in order to catch this kind of bugs before cluster deployment.
Is this simply not possible or is it a design choice we made for some reason?

-V.

On 29 June 2015 at 09:53, Robert Metzger <[hidden email]> wrote:
It is working in the IDE because there we execute everything in the same JVM, so the mapper can access the correct value of the static variable.
When submitting a job with the CLI frontend, there are at least two JVMs involved, and code running in the JM/TM can not access the value from the static variable in the Cli frontend.

On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a static field of the enclosing class, inside an anonymous mapper. Making the mapper a standalone class and passing the array size to the constructor solved the issue.

What I don't understand though, is why this worked fine when the job was executed from inside the IDE. Is serialization handled differently (skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru <[hidden email]> wrote:
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

    @Override
    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

        VertexCentricConfiguration parameters = new VertexCentricConfiguration();
        parameters.setSolutionSetUnmanagedMemory(false);
       
        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
                        new MinDistanceMessenger<K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
                        maxIterations, parameters);
        }

I'll send you the full code via a private e-mail.

Cheers,
Mihail


On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)







Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

Stephan Ewen
Static fields are not parts of the serialized program (by Java's definition). Whether the static field has the same value in the cluster JVMs depends on how the static field is initialized, whether it is initialized the same way in the shipped code, without the program's main method. 


BTW: We are seeing more and more cases now where committers are spending a lot of time on debugging reported Flink bugs, that are actually user bugs. At some point we need to think of a way to deal with this, as more and more such things are reported...

On Mon, Jun 29, 2015 at 11:02 AM, Vasiliki Kalavri <[hidden email]> wrote:
Thank you for the answer Robert!

I realize it's a single JVM running, yet I would expect programs to behave in the same way, i.e. serialization to happen (even if not necessary), in order to catch this kind of bugs before cluster deployment.
Is this simply not possible or is it a design choice we made for some reason?

-V.

On 29 June 2015 at 09:53, Robert Metzger <[hidden email]> wrote:
It is working in the IDE because there we execute everything in the same JVM, so the mapper can access the correct value of the static variable.
When submitting a job with the CLI frontend, there are at least two JVMs involved, and code running in the JM/TM can not access the value from the static variable in the Cli frontend.

On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a static field of the enclosing class, inside an anonymous mapper. Making the mapper a standalone class and passing the array size to the constructor solved the issue.

What I don't understand though, is why this worked fine when the job was executed from inside the IDE. Is serialization handled differently (skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru <[hidden email]> wrote:
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

    @Override
    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

        VertexCentricConfiguration parameters = new VertexCentricConfiguration();
        parameters.setSolutionSetUnmanagedMemory(false);
       
        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
                        new MinDistanceMessenger<K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
                        maxIterations, parameters);
        }

I'll send you the full code via a private e-mail.

Cheers,
Mihail


On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)








Reply | Threaded
Open this post in threaded view
|

Re: ArrayIndexOutOfBoundsException when running job from JAR

rmetzger0
@Stephan: I don't think there is a way to deal with this. In my understanding, the (main) purpose of the user@ list is not to report Flink bugs. It is a forum for users to help each other.
Flink committers happen to know a lot about the system, so its easy for them to help users. Also, its a good way to understand how users are using the system. This makes designing APIs, writing error messages and documentation easier.
I'm very happy to see that more and more non-committers are joining discussions here and helping other users. 



On Mon, Jun 29, 2015 at 11:05 AM, Stephan Ewen <[hidden email]> wrote:
Static fields are not parts of the serialized program (by Java's definition). Whether the static field has the same value in the cluster JVMs depends on how the static field is initialized, whether it is initialized the same way in the shipped code, without the program's main method. 


BTW: We are seeing more and more cases now where committers are spending a lot of time on debugging reported Flink bugs, that are actually user bugs. At some point we need to think of a way to deal with this, as more and more such things are reported...

On Mon, Jun 29, 2015 at 11:02 AM, Vasiliki Kalavri <[hidden email]> wrote:
Thank you for the answer Robert!

I realize it's a single JVM running, yet I would expect programs to behave in the same way, i.e. serialization to happen (even if not necessary), in order to catch this kind of bugs before cluster deployment.
Is this simply not possible or is it a design choice we made for some reason?

-V.

On 29 June 2015 at 09:53, Robert Metzger <[hidden email]> wrote:
It is working in the IDE because there we execute everything in the same JVM, so the mapper can access the correct value of the static variable.
When submitting a job with the CLI frontend, there are at least two JVMs involved, and code running in the JM/TM can not access the value from the static variable in the Cli frontend.

On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a static field of the enclosing class, inside an anonymous mapper. Making the mapper a standalone class and passing the array size to the constructor solved the issue.

What I don't understand though, is why this worked fine when the job was executed from inside the IDE. Is serialization handled differently (skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru <[hidden email]> wrote:
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

    @Override
    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

        VertexCentricConfiguration parameters = new VertexCentricConfiguration();
        parameters.setSolutionSetUnmanagedMemory(false);
       
        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
                .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
                        new MinDistanceMessenger<K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
                        maxIterations, parameters);
        }

I'll send you the full code via a private e-mail.

Cheers,
Mihail


On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <[hidden email]> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] array is initialized in the getVerticesDataSet() method.

        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = getVerticesDataSet(env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
        ...
        Graph<Integer, Tuple2<Integer[],String>, NullValue> intermediateGraph =
                graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from graphdistance.APSP$InitVerticesMapper.map(APSP.java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <[hidden email]> wrote:
Hi,

I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
I specify the entry point using the "-c" option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.run(Client.java:413)
    at org.apache.flink.client.program.Client.run(Client.java:356)
    at org.apache.flink.client.program.Client.run(Client.java:349)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)
    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)
    at org.apache.flink.graph.Graph$2.map(Graph.java:389)
    at org.apache.flink.graph.Graph$2.map(Graph.java:387)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)