flink k-means on hadoop cluster

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Pa Rö
i start the yarn-session.sh with sudo
and than the flink run command with sudo,
i get the following exception:

cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
org.apache.flink.client.program.ProgramInvocationException: Failed to resolve JobManager
    at org.apache.flink.client.program.Client.run(Client.java:378)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.io.IOException: JobManager at akka.tcp://flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure that the JobManager is running and its port is reachable.
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240)
    at org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
    at org.apache.flink.client.program.Client.run(Client.java:375)
    ... 15 more
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

the FlinkMain.java: 70 is:
env.execute("KMeans Flink");

2015-06-04 17:17 GMT+02:00 Pa Rö <[hidden email]>:
i try this:

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
chmod: changing permissions of '/user/cloudera/outputs': Permission denied. user=yarn is not the owner of inode=outputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
chmod: changing permissions of '/user/cloudera/inputs': Permission denied. user=yarn is not the owner of inode=inputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:15 GMT+02:00 Robert Metzger <[hidden email]>:
As the output of the "hadoop" tool indicates, it expects two arguments, you only passed one (777).
The second argument it is expecting is the path to the file you want to change.

In your case, it is:
hadoop fs -chmod 777 /user/cloudera/outputs


The reason why 
hadoop fs -chmod 777 *
does not work is the following: the * is evaluated by your local bash and expanded to the files which are present in your current, local directory. The bash expansion is not able to expand to the files in HDFS.


On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö <[hidden email]> wrote:
[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777
-chmod: Not enough arguments: expected 2 but got 1
Usage: hadoop fs [generic options] -chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...
bash-4.1$

you understand?

2015-06-04 17:04 GMT+02:00 Robert Metzger <[hidden email]>:
It looks like the user "yarn" which is running Flink doesn't have permission to access the files.

Can you do "sudo su yarn" to become the "yarn" user. Then, you can do "hadoop fs -chmod 777" to make the files accessible for everyone.


On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö <[hidden email]> wrote:
okay, it's work, i get a exception:

[cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
[cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
bash: flink: command not found
[cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
    at mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
    at mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


how i must set the files in the hdfs?
quickstart.cloudera:50075/home/cloudera/output?

2015-06-04 16:51 GMT+02:00 Robert Metzger <[hidden email]>:
Once you've started the YARN session, you can submit a Flink job with "./bin/flink run <pathToYourJar>".

The jar file of your job doesn't need to be in HDFS. It has to be in the local file system and flink will send it to all machines.

On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö <[hidden email]> wrote:
okay, now it run on my hadoop.
how i can start my flink job? and where must the jar file save, at hdfs or as local file?

2015-06-04 16:31 GMT+02:00 Robert Metzger <[hidden email]>:
Yes, you have to run these commands in the command line of the Cloudera VM.

On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö <[hidden email]> wrote:
you mean run this command on terminal/shell and not define a hue job?

2015-06-04 16:25 GMT+02:00 Robert Metzger <[hidden email]>:
It should be certainly possible to run Flink on a cloudera live VM

I think these are the commands you need to execute:

wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/
./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

If that is not working for you, please post the exact error message you are getting and I can help you to get it to run.


On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö <[hidden email]> wrote:
hi robert,

i think the problem is the hue api,
i had the same problem with spark submit script,
but on the new hue release, they have a spark submit api.

i asked the group for the same problem with spark, no reply.

i want test my app on local cluster, before i run it on the big cluster,
for that i use cloudera live. maybe it give an other way to test flink on a local cluster vm?

2015-06-04 16:12 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Paul,

why did running Flink from the regular scripts not work for you?

I'm not an expert on Hue, I would recommend asking in the Hue user forum / mailing list: https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö <[hidden email]> wrote:
thanks,
now i want run my app on cloudera live vm single node,
how i can define my flink job with hue?
i try to run the flink script in the hdfs, it's not work.

best regards,
paul

2015-06-02 14:50 GMT+02:00 Robert Metzger <[hidden email]>:
I would recommend using HDFS.
For that, you need to specify the paths like this: hdfs:///path/to/data.

On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö <[hidden email]> wrote:
nice,

which file system i must use for the cluster? java.io or hadoop.fs or flink?

2015-06-02 14:29 GMT+02:00 Robert Metzger <[hidden email]>:
Hi,
you can start Flink on YARN on the Cloudera distribution.


These are the commands you need to execute
wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096




On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö <[hidden email]> wrote:
hi community,

i want test my flink k-means on a hadoop cluster. i use the cloudera live distribution. how i can run flink on this cluster? maybe only the java dependencies are engouth?

best regards,
paul

















Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Pa Rö
sorry, i see my yarn end before i can run my app, i must set the write access for yarn, maybe this solve my problem.

2015-06-04 17:33 GMT+02:00 Pa Rö <[hidden email]>:
i start the yarn-session.sh with sudo
and than the flink run command with sudo,
i get the following exception:

cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
org.apache.flink.client.program.ProgramInvocationException: Failed to resolve JobManager
    at org.apache.flink.client.program.Client.run(Client.java:378)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.io.IOException: JobManager at akka.tcp://flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure that the JobManager is running and its port is reachable.
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240)
    at org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
    at org.apache.flink.client.program.Client.run(Client.java:375)
    ... 15 more
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

the FlinkMain.java: 70 is:
env.execute("KMeans Flink");

2015-06-04 17:17 GMT+02:00 Pa Rö <[hidden email]>:
i try this:

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
chmod: changing permissions of '/user/cloudera/outputs': Permission denied. user=yarn is not the owner of inode=outputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
chmod: changing permissions of '/user/cloudera/inputs': Permission denied. user=yarn is not the owner of inode=inputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:15 GMT+02:00 Robert Metzger <[hidden email]>:
As the output of the "hadoop" tool indicates, it expects two arguments, you only passed one (777).
The second argument it is expecting is the path to the file you want to change.

In your case, it is:
hadoop fs -chmod 777 /user/cloudera/outputs


The reason why 
hadoop fs -chmod 777 *
does not work is the following: the * is evaluated by your local bash and expanded to the files which are present in your current, local directory. The bash expansion is not able to expand to the files in HDFS.


On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö <[hidden email]> wrote:
[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777
-chmod: Not enough arguments: expected 2 but got 1
Usage: hadoop fs [generic options] -chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...
bash-4.1$

you understand?

2015-06-04 17:04 GMT+02:00 Robert Metzger <[hidden email]>:
It looks like the user "yarn" which is running Flink doesn't have permission to access the files.

Can you do "sudo su yarn" to become the "yarn" user. Then, you can do "hadoop fs -chmod 777" to make the files accessible for everyone.


On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö <[hidden email]> wrote:
okay, it's work, i get a exception:

[cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
[cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
bash: flink: command not found
[cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
    at mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
    at mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


how i must set the files in the hdfs?
quickstart.cloudera:50075/home/cloudera/output?

2015-06-04 16:51 GMT+02:00 Robert Metzger <[hidden email]>:
Once you've started the YARN session, you can submit a Flink job with "./bin/flink run <pathToYourJar>".

The jar file of your job doesn't need to be in HDFS. It has to be in the local file system and flink will send it to all machines.

On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö <[hidden email]> wrote:
okay, now it run on my hadoop.
how i can start my flink job? and where must the jar file save, at hdfs or as local file?

2015-06-04 16:31 GMT+02:00 Robert Metzger <[hidden email]>:
Yes, you have to run these commands in the command line of the Cloudera VM.

On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö <[hidden email]> wrote:
you mean run this command on terminal/shell and not define a hue job?

2015-06-04 16:25 GMT+02:00 Robert Metzger <[hidden email]>:
It should be certainly possible to run Flink on a cloudera live VM

I think these are the commands you need to execute:

wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/
./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

If that is not working for you, please post the exact error message you are getting and I can help you to get it to run.


On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö <[hidden email]> wrote:
hi robert,

i think the problem is the hue api,
i had the same problem with spark submit script,
but on the new hue release, they have a spark submit api.

i asked the group for the same problem with spark, no reply.

i want test my app on local cluster, before i run it on the big cluster,
for that i use cloudera live. maybe it give an other way to test flink on a local cluster vm?

2015-06-04 16:12 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Paul,

why did running Flink from the regular scripts not work for you?

I'm not an expert on Hue, I would recommend asking in the Hue user forum / mailing list: https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö <[hidden email]> wrote:
thanks,
now i want run my app on cloudera live vm single node,
how i can define my flink job with hue?
i try to run the flink script in the hdfs, it's not work.

best regards,
paul

2015-06-02 14:50 GMT+02:00 Robert Metzger <[hidden email]>:
I would recommend using HDFS.
For that, you need to specify the paths like this: hdfs:///path/to/data.

On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö <[hidden email]> wrote:
nice,

which file system i must use for the cluster? java.io or hadoop.fs or flink?

2015-06-02 14:29 GMT+02:00 Robert Metzger <[hidden email]>:
Hi,
you can start Flink on YARN on the Cloudera distribution.


These are the commands you need to execute
wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096




On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö <[hidden email]> wrote:
hi community,

i want test my flink k-means on a hadoop cluster. i use the cloudera live distribution. how i can run flink on this cluster? maybe only the java dependencies are engouth?

best regards,
paul


















Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Pa Rö
i have change the permissions from the cloudera user and try the following command.
and the files exist on hdfs ;) i set the files in my properties file like "flink.output=/user/cloudera/outputs/output_flink"
i get the same exception again, maybe the problem have an other reason?

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:52601
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:38 GMT+02:00 Pa Rö <[hidden email]>:
sorry, i see my yarn end before i can run my app, i must set the write access for yarn, maybe this solve my problem.

2015-06-04 17:33 GMT+02:00 Pa Rö <[hidden email]>:
i start the yarn-session.sh with sudo
and than the flink run command with sudo,
i get the following exception:

cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
org.apache.flink.client.program.ProgramInvocationException: Failed to resolve JobManager
    at org.apache.flink.client.program.Client.run(Client.java:378)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.io.IOException: JobManager at akka.tcp://flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure that the JobManager is running and its port is reachable.
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240)
    at org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
    at org.apache.flink.client.program.Client.run(Client.java:375)
    ... 15 more
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

the FlinkMain.java: 70 is:
env.execute("KMeans Flink");

2015-06-04 17:17 GMT+02:00 Pa Rö <[hidden email]>:
i try this:

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
chmod: changing permissions of '/user/cloudera/outputs': Permission denied. user=yarn is not the owner of inode=outputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
chmod: changing permissions of '/user/cloudera/inputs': Permission denied. user=yarn is not the owner of inode=inputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:15 GMT+02:00 Robert Metzger <[hidden email]>:
As the output of the "hadoop" tool indicates, it expects two arguments, you only passed one (777).
The second argument it is expecting is the path to the file you want to change.

In your case, it is:
hadoop fs -chmod 777 /user/cloudera/outputs


The reason why 
hadoop fs -chmod 777 *
does not work is the following: the * is evaluated by your local bash and expanded to the files which are present in your current, local directory. The bash expansion is not able to expand to the files in HDFS.


On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö <[hidden email]> wrote:
[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777
-chmod: Not enough arguments: expected 2 but got 1
Usage: hadoop fs [generic options] -chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...
bash-4.1$

you understand?

2015-06-04 17:04 GMT+02:00 Robert Metzger <[hidden email]>:
It looks like the user "yarn" which is running Flink doesn't have permission to access the files.

Can you do "sudo su yarn" to become the "yarn" user. Then, you can do "hadoop fs -chmod 777" to make the files accessible for everyone.


On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö <[hidden email]> wrote:
okay, it's work, i get a exception:

[cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
[cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
bash: flink: command not found
[cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
    at mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
    at mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


how i must set the files in the hdfs?
quickstart.cloudera:50075/home/cloudera/output?

2015-06-04 16:51 GMT+02:00 Robert Metzger <[hidden email]>:
Once you've started the YARN session, you can submit a Flink job with "./bin/flink run <pathToYourJar>".

The jar file of your job doesn't need to be in HDFS. It has to be in the local file system and flink will send it to all machines.

On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö <[hidden email]> wrote:
okay, now it run on my hadoop.
how i can start my flink job? and where must the jar file save, at hdfs or as local file?

2015-06-04 16:31 GMT+02:00 Robert Metzger <[hidden email]>:
Yes, you have to run these commands in the command line of the Cloudera VM.

On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö <[hidden email]> wrote:
you mean run this command on terminal/shell and not define a hue job?

2015-06-04 16:25 GMT+02:00 Robert Metzger <[hidden email]>:
It should be certainly possible to run Flink on a cloudera live VM

I think these are the commands you need to execute:

wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/
./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

If that is not working for you, please post the exact error message you are getting and I can help you to get it to run.


On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö <[hidden email]> wrote:
hi robert,

i think the problem is the hue api,
i had the same problem with spark submit script,
but on the new hue release, they have a spark submit api.

i asked the group for the same problem with spark, no reply.

i want test my app on local cluster, before i run it on the big cluster,
for that i use cloudera live. maybe it give an other way to test flink on a local cluster vm?

2015-06-04 16:12 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Paul,

why did running Flink from the regular scripts not work for you?

I'm not an expert on Hue, I would recommend asking in the Hue user forum / mailing list: https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö <[hidden email]> wrote:
thanks,
now i want run my app on cloudera live vm single node,
how i can define my flink job with hue?
i try to run the flink script in the hdfs, it's not work.

best regards,
paul

2015-06-02 14:50 GMT+02:00 Robert Metzger <[hidden email]>:
I would recommend using HDFS.
For that, you need to specify the paths like this: hdfs:///path/to/data.

On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö <[hidden email]> wrote:
nice,

which file system i must use for the cluster? java.io or hadoop.fs or flink?

2015-06-02 14:29 GMT+02:00 Robert Metzger <[hidden email]>:
Hi,
you can start Flink on YARN on the Cloudera distribution.


These are the commands you need to execute
wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096




On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö <[hidden email]> wrote:
hi community,

i want test my flink k-means on a hadoop cluster. i use the cloudera live distribution. how i can run flink on this cluster? maybe only the java dependencies are engouth?

best regards,
paul



















Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Pa Rö
here my main class:
public static void main(String[] args) {
//load properties
Properties pro = new Properties();
try {
pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
String outputPath = pro.getProperty("flink.output");
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
DataSet<GeoTimeDataCenter> centroids = null;
try {
centroids = getCentroidDataSet(env);
} catch (Exception e1) {
e1.printStackTrace();
}
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
DataSet<GeoTimeDataCenter> newCentroids = points
// compute closest centroid for each point
.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
// count and sum point coordinates for each centroid
.groupBy(0).reduceGroup(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
// emit result
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
maybe i can't use the following for the hdfs?
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();

2015-06-04 17:53 GMT+02:00 Pa Rö <[hidden email]>:
i have change the permissions from the cloudera user and try the following command.
and the files exist on hdfs ;) i set the files in my properties file like "flink.output=/user/cloudera/outputs/output_flink"
i get the same exception again, maybe the problem have an other reason?

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:52601
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)

    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:38 GMT+02:00 Pa Rö <[hidden email]>:
sorry, i see my yarn end before i can run my app, i must set the write access for yarn, maybe this solve my problem.

2015-06-04 17:33 GMT+02:00 Pa Rö <[hidden email]>:
i start the yarn-session.sh with sudo
and than the flink run command with sudo,
i get the following exception:

cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
org.apache.flink.client.program.ProgramInvocationException: Failed to resolve JobManager
    at org.apache.flink.client.program.Client.run(Client.java:378)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.io.IOException: JobManager at akka.tcp://flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure that the JobManager is running and its port is reachable.
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240)
    at org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
    at org.apache.flink.client.program.Client.run(Client.java:375)
    ... 15 more
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

the FlinkMain.java: 70 is:
env.execute("KMeans Flink");

2015-06-04 17:17 GMT+02:00 Pa Rö <[hidden email]>:
i try this:

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
chmod: changing permissions of '/user/cloudera/outputs': Permission denied. user=yarn is not the owner of inode=outputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
chmod: changing permissions of '/user/cloudera/inputs': Permission denied. user=yarn is not the owner of inode=inputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:15 GMT+02:00 Robert Metzger <[hidden email]>:
As the output of the "hadoop" tool indicates, it expects two arguments, you only passed one (777).
The second argument it is expecting is the path to the file you want to change.

In your case, it is:
hadoop fs -chmod 777 /user/cloudera/outputs


The reason why 
hadoop fs -chmod 777 *
does not work is the following: the * is evaluated by your local bash and expanded to the files which are present in your current, local directory. The bash expansion is not able to expand to the files in HDFS.


On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö <[hidden email]> wrote:
[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777
-chmod: Not enough arguments: expected 2 but got 1
Usage: hadoop fs [generic options] -chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...
bash-4.1$

you understand?

2015-06-04 17:04 GMT+02:00 Robert Metzger <[hidden email]>:
It looks like the user "yarn" which is running Flink doesn't have permission to access the files.

Can you do "sudo su yarn" to become the "yarn" user. Then, you can do "hadoop fs -chmod 777" to make the files accessible for everyone.


On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö <[hidden email]> wrote:
okay, it's work, i get a exception:

[cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
[cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
bash: flink: command not found
[cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
    at mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
    at mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


how i must set the files in the hdfs?
quickstart.cloudera:50075/home/cloudera/output?

2015-06-04 16:51 GMT+02:00 Robert Metzger <[hidden email]>:
Once you've started the YARN session, you can submit a Flink job with "./bin/flink run <pathToYourJar>".

The jar file of your job doesn't need to be in HDFS. It has to be in the local file system and flink will send it to all machines.

On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö <[hidden email]> wrote:
okay, now it run on my hadoop.
how i can start my flink job? and where must the jar file save, at hdfs or as local file?

2015-06-04 16:31 GMT+02:00 Robert Metzger <[hidden email]>:
Yes, you have to run these commands in the command line of the Cloudera VM.

On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö <[hidden email]> wrote:
you mean run this command on terminal/shell and not define a hue job?

2015-06-04 16:25 GMT+02:00 Robert Metzger <[hidden email]>:
It should be certainly possible to run Flink on a cloudera live VM

I think these are the commands you need to execute:

wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/
./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

If that is not working for you, please post the exact error message you are getting and I can help you to get it to run.


On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö <[hidden email]> wrote:
hi robert,

i think the problem is the hue api,
i had the same problem with spark submit script,
but on the new hue release, they have a spark submit api.

i asked the group for the same problem with spark, no reply.

i want test my app on local cluster, before i run it on the big cluster,
for that i use cloudera live. maybe it give an other way to test flink on a local cluster vm?

2015-06-04 16:12 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Paul,

why did running Flink from the regular scripts not work for you?

I'm not an expert on Hue, I would recommend asking in the Hue user forum / mailing list: https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö <[hidden email]> wrote:
thanks,
now i want run my app on cloudera live vm single node,
how i can define my flink job with hue?
i try to run the flink script in the hdfs, it's not work.

best regards,
paul

2015-06-02 14:50 GMT+02:00 Robert Metzger <[hidden email]>:
I would recommend using HDFS.
For that, you need to specify the paths like this: hdfs:///path/to/data.

On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö <[hidden email]> wrote:
nice,

which file system i must use for the cluster? java.io or hadoop.fs or flink?

2015-06-02 14:29 GMT+02:00 Robert Metzger <[hidden email]>:
Hi,
you can start Flink on YARN on the Cloudera distribution.


These are the commands you need to execute
wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096




On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö <[hidden email]> wrote:
hi community,

i want test my flink k-means on a hadoop cluster. i use the cloudera live distribution. how i can run flink on this cluster? maybe only the java dependencies are engouth?

best regards,
paul




















Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

rmetzger0
No, the permissions are still not correct, otherwise, Flink would not complan.

The error message of Flink is actually pretty precise: "Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it."

Does the file exist and does the user "yarn" has permission to access it?

On Thu, Jun 4, 2015 at 5:57 PM, Pa Rö <[hidden email]> wrote:
here my main class:
public static void main(String[] args) {
//load properties
Properties pro = new Properties();
try {
pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
String outputPath = pro.getProperty("flink.output");
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
DataSet<GeoTimeDataCenter> centroids = null;
try {
centroids = getCentroidDataSet(env);
} catch (Exception e1) {
e1.printStackTrace();
}
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
DataSet<GeoTimeDataCenter> newCentroids = points
// compute closest centroid for each point
.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
// count and sum point coordinates for each centroid
.groupBy(0).reduceGroup(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
// emit result
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
maybe i can't use the following for the hdfs?
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();

2015-06-04 17:53 GMT+02:00 Pa Rö <[hidden email]>:
i have change the permissions from the cloudera user and try the following command.
and the files exist on hdfs ;) i set the files in my properties file like "flink.output=/user/cloudera/outputs/output_flink"
i get the same exception again, maybe the problem have an other reason?

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:52601
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink)

    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:38 GMT+02:00 Pa Rö <[hidden email]>:
sorry, i see my yarn end before i can run my app, i must set the write access for yarn, maybe this solve my problem.

2015-06-04 17:33 GMT+02:00 Pa Rö <[hidden email]>:
i start the yarn-session.sh with sudo
and than the flink run command with sudo,
i get the following exception:

cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
org.apache.flink.client.program.ProgramInvocationException: Failed to resolve JobManager
    at org.apache.flink.client.program.Client.run(Client.java:378)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: java.io.IOException: JobManager at akka.tcp://flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure that the JobManager is running and its port is reachable.
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222)
    at org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240)
    at org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
    at org.apache.flink.client.program.Client.run(Client.java:375)
    ... 15 more
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), Path(/user/jobmanager)]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
    at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

the FlinkMain.java: 70 is:
env.execute("KMeans Flink");

2015-06-04 17:17 GMT+02:00 Pa Rö <[hidden email]>:
i try this:

[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs
chmod: changing permissions of '/user/cloudera/outputs': Permission denied. user=yarn is not the owner of inode=outputs
bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs
chmod: changing permissions of '/user/cloudera/inputs': Permission denied. user=yarn is not the owner of inode=inputs
bash-4.1$ exit
exit
[cloudera@quickstart bin]$ sudo ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


2015-06-04 17:15 GMT+02:00 Robert Metzger <[hidden email]>:
As the output of the "hadoop" tool indicates, it expects two arguments, you only passed one (777).
The second argument it is expecting is the path to the file you want to change.

In your case, it is:
hadoop fs -chmod 777 /user/cloudera/outputs


The reason why 
hadoop fs -chmod 777 *
does not work is the following: the * is evaluated by your local bash and expanded to the files which are present in your current, local directory. The bash expansion is not able to expand to the files in HDFS.


On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö <[hidden email]> wrote:
[cloudera@quickstart bin]$ sudo su yarn
bash-4.1$ hadoop fs -chmod 777
-chmod: Not enough arguments: expected 2 but got 1
Usage: hadoop fs [generic options] -chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...
bash-4.1$

you understand?

2015-06-04 17:04 GMT+02:00 Robert Metzger <[hidden email]>:
It looks like the user "yarn" which is running Flink doesn't have permission to access the files.

Can you do "sudo su yarn" to become the "yarn" user. Then, you can do "hadoop fs -chmod 777" to make the files accessible for everyone.


On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö <[hidden email]> wrote:
okay, it's work, i get a exception:

[cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/
[cloudera@quickstart bin]$ flink run /home/cloudera/Desktop/ma-flink.jar
bash: flink: command not found
[cloudera@quickstart bin]$ ./flink run /home/cloudera/Desktop/ma-flink.jar
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Found YARN properties file /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties
Using JobManager address from YARN properties quickstart.cloudera/127.0.0.1:53874
java.io.IOException: Mkdirs failed to create /user/cloudera/outputs
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
    at mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21)
    at mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.client.program.Client.run(Client.java:412)
    at org.apache.flink.client.program.Client.run(Client.java:355)
    at org.apache.flink.client.program.Client.run(Client.java:348)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
    at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
    at org.apache.flink.client.program.Client.run(Client.java:315)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535)
    ... 21 more
Caused by: java.io.FileNotFoundException: File /user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390)
    at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
    at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
    ... 23 more


how i must set the files in the hdfs?
quickstart.cloudera:50075/home/cloudera/output?

2015-06-04 16:51 GMT+02:00 Robert Metzger <[hidden email]>:
Once you've started the YARN session, you can submit a Flink job with "./bin/flink run <pathToYourJar>".

The jar file of your job doesn't need to be in HDFS. It has to be in the local file system and flink will send it to all machines.

On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö <[hidden email]> wrote:
okay, now it run on my hadoop.
how i can start my flink job? and where must the jar file save, at hdfs or as local file?

2015-06-04 16:31 GMT+02:00 Robert Metzger <[hidden email]>:
Yes, you have to run these commands in the command line of the Cloudera VM.

On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö <[hidden email]> wrote:
you mean run this command on terminal/shell and not define a hue job?

2015-06-04 16:25 GMT+02:00 Robert Metzger <[hidden email]>:
It should be certainly possible to run Flink on a cloudera live VM

I think these are the commands you need to execute:

wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/
./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024

If that is not working for you, please post the exact error message you are getting and I can help you to get it to run.


On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö <[hidden email]> wrote:
hi robert,

i think the problem is the hue api,
i had the same problem with spark submit script,
but on the new hue release, they have a spark submit api.

i asked the group for the same problem with spark, no reply.

i want test my app on local cluster, before i run it on the big cluster,
for that i use cloudera live. maybe it give an other way to test flink on a local cluster vm?

2015-06-04 16:12 GMT+02:00 Robert Metzger <[hidden email]>:
Hi Paul,

why did running Flink from the regular scripts not work for you?

I'm not an expert on Hue, I would recommend asking in the Hue user forum / mailing list: https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user.

On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö <[hidden email]> wrote:
thanks,
now i want run my app on cloudera live vm single node,
how i can define my flink job with hue?
i try to run the flink script in the hdfs, it's not work.

best regards,
paul

2015-06-02 14:50 GMT+02:00 Robert Metzger <[hidden email]>:
I would recommend using HDFS.
For that, you need to specify the paths like this: hdfs:///path/to/data.

On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö <[hidden email]> wrote:
nice,

which file system i must use for the cluster? java.io or hadoop.fs or flink?

2015-06-02 14:29 GMT+02:00 Robert Metzger <[hidden email]>:
Hi,
you can start Flink on YARN on the Cloudera distribution.


These are the commands you need to execute
wget http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz
tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz
cd flink-0.9-SNAPSHOT/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096




On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö <[hidden email]> wrote:
hi community,

i want test my flink k-means on a hadoop cluster. i use the cloudera live distribution. how i can run flink on this cluster? maybe only the java dependencies are engouth?

best regards,
paul





















Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Pa Rö
Hi Robert,

i have see that you write me on stackoverflow, thanks. now the path is right and i get the old exception:
org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/127.0.0.1:8020/home/user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.

i have look at the hdfs and want give the user yarn all permissions:
[cloudera@quickstart bin]$ hdfs dfs -ls
Found 9 items
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 04:24 .Trash
drwxrwxrwt   - cloudera cloudera          0 2015-06-08 01:17 .flink
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 06:51 .staging
drwxrwxrwt   - cloudera cloudera          0 2015-02-17 08:33 gdelt
drwxrwxrwt   - cloudera cloudera          0 2015-06-02 06:42 inputs
-rwxrwxrwt   1 cloudera cloudera   31223141 2015-06-03 03:53 ma-mahout.jar
-rwxrwxrwt   1 cloudera cloudera   30037418 2015-06-03 03:53 ma-mapreduce.jar
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 07:38 oozie-oozi
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 03:59 outputs
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop inputs
chown: `inputs': No such file or directory
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop outputs
chown: `outputs': No such file or directory

something i do wrong, maybe you have a idea?

Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Till Rohrmann-2

I assume that the path inputs and outputs is not correct since you get the error message chown `output’: No such file or directory. Try to provide the full path to the chown command such as hdfs://ServerURI/path/to/your/directory.


On Mon, Jun 8, 2015 at 11:23 AM Pa Rö <[hidden email]> wrote:
Hi Robert,

i have see that you write me on stackoverflow, thanks. now the path is right and i get the old exception:
org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/127.0.0.1:8020/home/user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.

i have look at the hdfs and want give the user yarn all permissions:
[cloudera@quickstart bin]$ hdfs dfs -ls
Found 9 items
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 04:24 .Trash
drwxrwxrwt   - cloudera cloudera          0 2015-06-08 01:17 .flink
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 06:51 .staging
drwxrwxrwt   - cloudera cloudera          0 2015-02-17 08:33 gdelt
drwxrwxrwt   - cloudera cloudera          0 2015-06-02 06:42 inputs
-rwxrwxrwt   1 cloudera cloudera   31223141 2015-06-03 03:53 ma-mahout.jar
-rwxrwxrwt   1 cloudera cloudera   30037418 2015-06-03 03:53 ma-mapreduce.jar
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 07:38 oozie-oozi
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 03:59 outputs
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop inputs
chown: `inputs': No such file or directory
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop outputs
chown: `outputs': No such file or directory

something i do wrong, maybe you have a idea?

Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Pa Rö
it's works, now i have set the permissiions to the yarn user,
but my flink app not find the path. i try following path and get the same exception:
file:///127.0.0.1:8020/user/cloudera/inputs/

how i must set the path to hdfs??


2015-06-08 11:38 GMT+02:00 Till Rohrmann <[hidden email]>:

I assume that the path inputs and outputs is not correct since you get the error message chown `output’: No such file or directory. Try to provide the full path to the chown command such as hdfs://ServerURI/path/to/your/directory.


On Mon, Jun 8, 2015 at 11:23 AM Pa Rö <[hidden email]> wrote:
Hi Robert,

i have see that you write me on stackoverflow, thanks. now the path is right and i get the old exception:
org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/127.0.0.1:8020/home/user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.

i have look at the hdfs and want give the user yarn all permissions:
[cloudera@quickstart bin]$ hdfs dfs -ls
Found 9 items
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 04:24 .Trash
drwxrwxrwt   - cloudera cloudera          0 2015-06-08 01:17 .flink
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 06:51 .staging
drwxrwxrwt   - cloudera cloudera          0 2015-02-17 08:33 gdelt
drwxrwxrwt   - cloudera cloudera          0 2015-06-02 06:42 inputs
-rwxrwxrwt   1 cloudera cloudera   31223141 2015-06-03 03:53 ma-mahout.jar
-rwxrwxrwt   1 cloudera cloudera   30037418 2015-06-03 03:53 ma-mapreduce.jar
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 07:38 oozie-oozi
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 03:59 outputs
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop inputs
chown: `inputs': No such file or directory
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop outputs
chown: `outputs': No such file or directory

something i do wrong, maybe you have a idea?


Reply | Threaded
Open this post in threaded view
|

Re: flink k-means on hadoop cluster

Till Rohrmann-2

hdfs://ServerURI:8020/user/cloudera/inputs should do the trick


On Mon, Jun 8, 2015 at 12:41 PM Pa Rö <[hidden email]> wrote:
it's works, now i have set the permissiions to the yarn user,
but my flink app not find the path. i try following path and get the same exception:
file:///127.0.0.1:8020/user/cloudera/inputs/

how i must set the path to hdfs??


2015-06-08 11:38 GMT+02:00 Till Rohrmann <[hidden email]>:

I assume that the path inputs and outputs is not correct since you get the error message chown `output’: No such file or directory. Try to provide the full path to the chown command such as hdfs://ServerURI/path/to/your/directory.


On Mon, Jun 8, 2015 at 11:23 AM Pa Rö <[hidden email]> wrote:
Hi Robert,

i have see that you write me on stackoverflow, thanks. now the path is right and i get the old exception:
org.apache.flink.runtime.JobException: Creating the input splits caused an error: File file:/127.0.0.1:8020/home/user/cloudera/outputs/seed-1 does not exist or the user running Flink ('yarn') has insufficient permissions to access it.

i have look at the hdfs and want give the user yarn all permissions:
[cloudera@quickstart bin]$ hdfs dfs -ls
Found 9 items
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 04:24 .Trash
drwxrwxrwt   - cloudera cloudera          0 2015-06-08 01:17 .flink
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 06:51 .staging
drwxrwxrwt   - cloudera cloudera          0 2015-02-17 08:33 gdelt
drwxrwxrwt   - cloudera cloudera          0 2015-06-02 06:42 inputs
-rwxrwxrwt   1 cloudera cloudera   31223141 2015-06-03 03:53 ma-mahout.jar
-rwxrwxrwt   1 cloudera cloudera   30037418 2015-06-03 03:53 ma-mapreduce.jar
drwxrwxrwt   - cloudera cloudera          0 2015-06-04 07:38 oozie-oozi
drwxrwxrwt   - cloudera cloudera          0 2015-06-03 03:59 outputs
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop inputs
chown: `inputs': No such file or directory
[cloudera@quickstart bin]$ sudo hdfs dfs -chown -R yarn:hadoop outputs
chown: `outputs': No such file or directory

something i do wrong, maybe you have a idea?


12