Fwd: Re: Submitting jobs from within Scala code

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Re: Submitting jobs from within Scala code

Philipp Goetze
Hey Tim,

I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack.

How do you normally submit jobs (jars) from within the code?

Best Regards,
Philipp


-------- Forwarded Message --------
Subject: Re: Submitting jobs from within Scala code
Date: Thu, 16 Jul 2015 14:31:01 +0200
From: Philipp Goetze [hidden email]
To: [hidden email]


Hey,

from the JobManager I do not get any more hints:
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.eclipse.jetty.util.log                                    - RESPONSE /jobsInfo  200
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message RequestBlobManagerPort in 0 ms from Actor[[hidden email]].
13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection            - Received PUT request for content addressable BLOB
13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from CREATED to FAILING.
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from FAILING to FAILED.
13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms from Actor[[hidden email]].
13:36:07,524 DEBUG Remoting                                                      - Remote system with address [[hidden email]] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.



The code of the job is quite simple (just a test-case). As stated before it works when using the wrapper script and the web client. I think something is wrong in the submitJar method I posted earlier. But here the code of the submitted job:

import org.apache.flink.api.scala._
import dbis.flink._
object load {
def tupleAToString(t: List[Any]): String = { 
  implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
  val sb = new StringBuilder
  sb.append(t(0))
  sb.toString
}
    def main(args: Array[String]) {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", '\t')
        A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
        env.execute("Starting Query")
    }   
}

Best Regards,
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Re: Submitting jobs from within Scala code

Till Rohrmann

Hi Philipp,

what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar")

    val elements = env.fromElements(1,2,3,4,5)

    val doubled = elements.map(x => 2*x)

    doubled.printOnTaskManager("TaskManager")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }

But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load?

Cheers,
Till


On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <[hidden email]> wrote:
Hey Tim,

I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack.

How do you normally submit jobs (jars) from within the code?

Best Regards,
Philipp


-------- Forwarded Message --------
Subject: Re: Submitting jobs from within Scala code
Date: Thu, 16 Jul 2015 14:31:01 +0200
From: Philipp Goetze [hidden email]
To: [hidden email]


Hey,

from the JobManager I do not get any more hints:
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.eclipse.jetty.util.log                                    - RESPONSE /jobsInfo  200
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message RequestBlobManagerPort in 0 ms from Actor[[hidden email]].
13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection            - Received PUT request for content addressable BLOB
13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from CREATED to FAILING.
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from FAILING to FAILED.
13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms from Actor[[hidden email]].
13:36:07,524 DEBUG Remoting                                                      - Remote system with address [[hidden email]] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.



The code of the job is quite simple (just a test-case). As stated before it works when using the wrapper script and the web client. I think something is wrong in the submitJar method I posted earlier. But here the code of the submitted job:

import org.apache.flink.api.scala._
import dbis.flink._
object load {
def tupleAToString(t: List[Any]): String = { 
  implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
  val sb = new StringBuilder
  sb.append(t(0))
  sb.toString
}
    def main(args: Array[String]) {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", '\t')
        A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
        env.execute("Starting Query")
    }   
}

Best Regards,
Philipp

Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs from within Scala code

Philipp Goetze
Hi Till,

many thanks for your effort. I finally got it working.

I'm a bit embarrassed because the issue was solved by using the same flink-dist-JAR from the locally running Flink version. So to say I used an older Snapshot version for compiling than for running :-[

Best Regards,
Philipp

On 16.07.2015 17:35, Till Rohrmann wrote:

Hi Philipp,

what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar")

    val elements = env.fromElements(1,2,3,4,5)

    val doubled = elements.map(x => 2*x)

    doubled.printOnTaskManager("TaskManager")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }

But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load?

Cheers,
Till


On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <[hidden email]> wrote:
Hey Tim,

I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack.

How do you normally submit jobs (jars) from within the code?

Best Regards,
Philipp


-------- Forwarded Message --------
Subject: Re: Submitting jobs from within Scala code
Date: Thu, 16 Jul 2015 14:31:01 +0200
From: Philipp Goetze [hidden email]
To: [hidden email]


Hey,

from the JobManager I do not get any more hints:
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.eclipse.jetty.util.log                                    - RESPONSE /jobsInfo  200
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message RequestBlobManagerPort in 0 ms from Actor[[hidden email]].
13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection            - Received PUT request for content addressable BLOB
13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from CREATED to FAILING.
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from FAILING to FAILED.
13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms from Actor[[hidden email]].
13:36:07,524 DEBUG Remoting                                                      - Remote system with address [[hidden email]] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.



The code of the job is quite simple (just a test-case). As stated before it works when using the wrapper script and the web client. I think something is wrong in the submitJar method I posted earlier. But here the code of the submitted job:

import org.apache.flink.api.scala._
import dbis.flink._
object load {
def tupleAToString(t: List[Any]): String = { 
  implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
  val sb = new StringBuilder
  sb.append(t(0))
  sb.toString
}
    def main(args: Array[String]) {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", '\t')
        A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
        env.execute("Starting Query")
    }   
}

Best Regards,
Philipp


Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs from within Scala code

Till Rohrmann
Good to hear that your problem is solved :-)

Cheers,
Till

On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze <[hidden email]> wrote:
Hi Till,

many thanks for your effort. I finally got it working.

I'm a bit embarrassed because the issue was solved by using the same flink-dist-JAR from the locally running Flink version. So to say I used an older Snapshot version for compiling than for running :-[

Best Regards,
Philipp


On 16.07.2015 17:35, Till Rohrmann wrote:

Hi Philipp,

what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar")

    val elements = env.fromElements(1,2,3,4,5)

    val doubled = elements.map(x => 2*x)

    doubled.printOnTaskManager("TaskManager")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }

But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load?

Cheers,
Till


On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <[hidden email]> wrote:
Hey Tim,

I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack.

How do you normally submit jobs (jars) from within the code?

Best Regards,
Philipp


-------- Forwarded Message --------
Subject: Re: Submitting jobs from within Scala code
Date: Thu, 16 Jul 2015 14:31:01 +0200
From: Philipp Goetze [hidden email]
To: [hidden email]


Hey,

from the JobManager I do not get any more hints:
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.eclipse.jetty.util.log                                    - RESPONSE /jobsInfo  200
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message RequestBlobManagerPort in 0 ms from Actor[[hidden email]].
13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection            - Received PUT request for content addressable BLOB
13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from CREATED to FAILING.
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from FAILING to FAILED.
13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms from Actor[[hidden email]].
13:36:07,524 DEBUG Remoting                                                      - Remote system with address [[hidden email]] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.



The code of the job is quite simple (just a test-case). As stated before it works when using the wrapper script and the web client. I think something is wrong in the submitJar method I posted earlier. But here the code of the submitted job:

import org.apache.flink.api.scala._
import dbis.flink._
object load {
def tupleAToString(t: List[Any]): String = { 
  implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
  val sb = new StringBuilder
  sb.append(t(0))
  sb.toString
}
    def main(args: Array[String]) {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", '\t')
        A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
        env.execute("Starting Query")
    }   
}

Best Regards,
Philipp



Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs from within Scala code

Stephan Ewen
Seems that version mismatches are one of the most common sources of issues...

Maybe we should think about putting a version number into the messages (at least between client and JobManager) and fail fast on version mismatches...

On Thu, Jul 16, 2015 at 5:56 PM, Till Rohrmann <[hidden email]> wrote:
Good to hear that your problem is solved :-)

Cheers,
Till

On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze <[hidden email]> wrote:
Hi Till,

many thanks for your effort. I finally got it working.

I'm a bit embarrassed because the issue was solved by using the same flink-dist-JAR from the locally running Flink version. So to say I used an older Snapshot version for compiling than for running :-[

Best Regards,
Philipp


On 16.07.2015 17:35, Till Rohrmann wrote:

Hi Philipp,

what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar")

    val elements = env.fromElements(1,2,3,4,5)

    val doubled = elements.map(x => 2*x)

    doubled.printOnTaskManager("TaskManager")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }

But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load?

Cheers,
Till


On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <[hidden email]> wrote:
Hey Tim,

I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack.

How do you normally submit jobs (jars) from within the code?

Best Regards,
Philipp


-------- Forwarded Message --------
Subject: Re: Submitting jobs from within Scala code
Date: Thu, 16 Jul 2015 14:31:01 +0200
From: Philipp Goetze [hidden email]
To: [hidden email]


Hey,

from the JobManager I do not get any more hints:
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.eclipse.jetty.util.log                                    - RESPONSE /jobsInfo  200
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message RequestBlobManagerPort in 0 ms from Actor[[hidden email]].
13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection            - Received PUT request for content addressable BLOB
13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from CREATED to FAILING.
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from FAILING to FAILED.
13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms from Actor[[hidden email]].
13:36:07,524 DEBUG Remoting                                                      - Remote system with address [[hidden email]] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.



The code of the job is quite simple (just a test-case). As stated before it works when using the wrapper script and the web client. I think something is wrong in the submitJar method I posted earlier. But here the code of the submitted job:

import org.apache.flink.api.scala._
import dbis.flink._
object load {
def tupleAToString(t: List[Any]): String = { 
  implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
  val sb = new StringBuilder
  sb.append(t(0))
  sb.toString
}
    def main(args: Array[String]) {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", '\t')
        A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
        env.execute("Starting Query")
    }   
}

Best Regards,
Philipp




Reply | Threaded
Open this post in threaded view
|

Re: Submitting jobs from within Scala code

Till Rohrmann

This should be rather easy to add with the latest addition of the ActorGateway and the message decoration.


On Fri, Jul 17, 2015 at 5:04 PM, Stephan Ewen <[hidden email]> wrote:
Seems that version mismatches are one of the most common sources of issues...

Maybe we should think about putting a version number into the messages (at least between client and JobManager) and fail fast on version mismatches...

On Thu, Jul 16, 2015 at 5:56 PM, Till Rohrmann <[hidden email]> wrote:
Good to hear that your problem is solved :-)

Cheers,
Till

On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze <[hidden email]> wrote:
Hi Till,

many thanks for your effort. I finally got it working.

I'm a bit embarrassed because the issue was solved by using the same flink-dist-JAR from the locally running Flink version. So to say I used an older Snapshot version for compiling than for running :-[

Best Regards,
Philipp


On 16.07.2015 17:35, Till Rohrmann wrote:

Hi Philipp,

what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar")

    val elements = env.fromElements(1,2,3,4,5)

    val doubled = elements.map(x => 2*x)

    doubled.printOnTaskManager("TaskManager")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }

But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load?

Cheers,
Till


On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <[hidden email]> wrote:
Hey Tim,

I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack.

How do you normally submit jobs (jars) from within the code?

Best Regards,
Philipp


-------- Forwarded Message --------
Subject: Re: Submitting jobs from within Scala code
Date: Thu, 16 Jul 2015 14:31:01 +0200
From: Philipp Goetze [hidden email]
To: [hidden email]


Hey,

from the JobManager I do not get any more hints:
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist           - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
13:36:06,674 DEBUG org.eclipse.jetty.util.log                                    - RESPONSE /jobsInfo  200
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message RequestBlobManagerPort in 0 ms from Actor[[hidden email]].
13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection            - Received PUT request for content addressable BLOB
13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[[hidden email]].
13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from CREATED to FAILING.
13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph        - Starting Query switched from FAILING to FAILED.
13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager                - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms from Actor[[hidden email]].
13:36:07,524 DEBUG Remoting                                                      - Remote system with address [[hidden email]] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.



The code of the job is quite simple (just a test-case). As stated before it works when using the wrapper script and the web client. I think something is wrong in the submitJar method I posted earlier. But here the code of the submitted job:

import org.apache.flink.api.scala._
import dbis.flink._
object load {
def tupleAToString(t: List[Any]): String = { 
  implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
  val sb = new StringBuilder
  sb.append(t(0))
  sb.toString
}
    def main(args: Array[String]) {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", '\t')
        A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
        env.execute("Starting Query")
    }   
}

Best Regards,
Philipp