Hello,
I have some questions regarding how to run one of the flink-storm-examples, the WordCountTopology. How should I run the job? On github its says I should just execute bin/flink run example.jar but when I execute: bin/flink run WordCount-StormTopology.jar nothing happens. What am I doing wrong? and How can I run the WordCounttopology via StormWordCountRemoteBySubmitter? Also why did you guys get rid of the KafkaSource class? What is the API now for subscribing to a kafka source? Best, Jerry |
Concerning the KafkaSource, please use the "FlinkKafkaConsumer". Its the new and better KafkaSource. Am 01.09.2015 21:40 schrieb "Jerry Peng" <[hidden email]>:
|
In reply to this post by Jerry Peng
Hi Jerry,
WordCount-StormTopology uses a hard coded dop of 4. If you start up Flink in local mode (bin/start-local-streaming.sh), you need to increase the number of task slots to at least 4 in conf/flink-conf.yaml before starting Flink -> taskmanager.numberOfTaskSlots You should actually see the following exception in log/flink-...-jobmanager-...log > NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. WordCount-StormTopology does use StormWordCountRemoteBySubmitter internally. So, you do use it already ;) I am not sure what you mean by "get rid of KafkaSource"? It is still in the code base. Which version to you use? In flink-0.10-SNAPSHOT it is located in submodule "flink-connector-kafka" (which is submodule of "flink-streaming-connector-parent" -- which is submodule of "flink-streamping-parent"). -Matthias On 09/01/2015 09:40 PM, Jerry Peng wrote: > Hello, > > I have some questions regarding how to run one of the > flink-storm-examples, the WordCountTopology. How should I run the job? > On github its says I should just execute > bin/flink run example.jar but when I execute: > > bin/flink run WordCount-StormTopology.jar > > nothing happens. What am I doing wrong? and How can I run the > WordCounttopology via StormWordCountRemoteBySubmitter? > > Also why did you guys get rid of the KafkaSource class? What is the API > now for subscribing to a kafka source? > > Best, > > Jerry signature.asc (836 bytes) Download Attachment |
When I run WordCount-StormTopology I get the following exception: ~/flink/bin/flink run WordCount-StormTopology.jar hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:278) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) Caused by: NotAliveException(msg:null) at org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) at org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) at org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) ... 6 more The exception above occurred while trying to run your command. Any idea how to fix this? On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax <[hidden email]> wrote: Hi Jerry, |
Yes. That is what I expected.
JobManager cannot start the job, due to less task slots. It logs the exception NoResourceAvailableException (it is not shown in stdout; see "log" folder). There is no feedback to Flink CLI that the job could not be started. Furthermore, WordCount-StormTopology sleeps for 5 seconds and tries to "kill" the job. However, because the job was never started, there is a NotAliveException which in print to stdout. -Matthias On 09/01/2015 10:26 PM, Jerry Peng wrote: > When I run WordCount-StormTopology I get the following exception: > > ~/flink/bin/flink run WordCount-StormTopology.jar > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > > at org.apache.flink.client.program.Client.run(Client.java:278) > > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) > > at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) > > Caused by: NotAliveException(msg:null) > > at > org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) > > at > org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) > > at > org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:483) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > > ... 6 more > > > The exception above occurred while trying to run your command. > > > Any idea how to fix this? > > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax > <[hidden email] <mailto:[hidden email]>> > wrote: > > Hi Jerry, > > WordCount-StormTopology uses a hard coded dop of 4. If you start up > Flink in local mode (bin/start-local-streaming.sh), you need to increase > the number of task slots to at least 4 in conf/flink-conf.yaml before > starting Flink -> taskmanager.numberOfTaskSlots > > You should actually see the following exception in > log/flink-...-jobmanager-...log > > > NoResourceAvailableException: Not enough free slots available to > run the job. You can decrease the operator parallelism or increase > the number of slots per TaskManager in the configuration. > > WordCount-StormTopology does use StormWordCountRemoteBySubmitter > internally. So, you do use it already ;) > > I am not sure what you mean by "get rid of KafkaSource"? It is still in > the code base. Which version to you use? In flink-0.10-SNAPSHOT it is > located in submodule "flink-connector-kafka" (which is submodule of > "flink-streaming-connector-parent" -- which is submodule of > "flink-streamping-parent"). > > > -Matthias > > > On 09/01/2015 09:40 PM, Jerry Peng wrote: > > Hello, > > > > I have some questions regarding how to run one of the > > flink-storm-examples, the WordCountTopology. How should I run the > job? > > On github its says I should just execute > > bin/flink run example.jar but when I execute: > > > > bin/flink run WordCount-StormTopology.jar > > > > nothing happens. What am I doing wrong? and How can I run the > > WordCounttopology via StormWordCountRemoteBySubmitter? > > > > Also why did you guys get rid of the KafkaSource class? What is > the API > > now for subscribing to a kafka source? > > > > Best, > > > > Jerry > > signature.asc (836 bytes) Download Attachment |
Hello, I corrected the number of slots for each task manager but now when I try to run the WordCount-StormTopology, the job manager daemon on my master node crashes and I get this exception in the log: java.lang.Exception: Received a message CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader session ID, even though the message requires a leader session ID. at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) It seems to have something to do with canceling of the topology after the sleep. Any ideas? Best, Jerry On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax <[hidden email]> wrote: Yes. That is what I expected. |
Oh yes. I forgot about this. I have already a fix for it in a pending
pull request... I hope that this PR is merged soon... If you want to observe the progress, look here: https://issues.apache.org/jira/browse/FLINK-2111 and https://issues.apache.org/jira/browse/FLINK-2338 This PR, resolves both and fixed the problem you observed: https://github.com/apache/flink/pull/750 -Matthias On 09/01/2015 11:09 PM, Jerry Peng wrote: > Hello, > > I corrected the number of slots for each task manager but now when I try > to run the WordCount-StormTopology, the job manager daemon on my master > node crashes and I get this exception in the log: > > java.lang.Exception: Received a message > CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader session ID, > even though the message requires a leader session ID. > > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41) > > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > It seems to have something to do with canceling of the topology after > the sleep. Any ideas? > > > Best, > > > Jerry > > > On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax > <[hidden email] <mailto:[hidden email]>> > wrote: > > Yes. That is what I expected. > > JobManager cannot start the job, due to less task slots. It logs the > exception NoResourceAvailableException (it is not shown in stdout; see > "log" folder). There is no feedback to Flink CLI that the job could not > be started. > > Furthermore, WordCount-StormTopology sleeps for 5 seconds and tries to > "kill" the job. However, because the job was never started, there is a > NotAliveException which in print to stdout. > > -Matthias > > > > On 09/01/2015 10:26 PM, Jerry Peng wrote: > > When I run WordCount-StormTopology I get the following exception: > > > > ~/flink/bin/flink run WordCount-StormTopology.jar > > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt > > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method caused an error. > > > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > > > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > > > > at org.apache.flink.client.program.Client.run(Client.java:278) > > > > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) > > > > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) > > > > Caused by: NotAliveException(msg:null) > > > > at > > > org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) > > > > at > > > org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) > > > > at > > > org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > at java.lang.reflect.Method.invoke(Method.java:483) > > > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > > > > ... 6 more > > > > > > The exception above occurred while trying to run your command. > > > > > > Any idea how to fix this? > > > > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax > > <[hidden email] > <mailto:[hidden email]> > <mailto:[hidden email] > <mailto:[hidden email]>>> > > wrote: > > > > Hi Jerry, > > > > WordCount-StormTopology uses a hard coded dop of 4. If you > start up > > Flink in local mode (bin/start-local-streaming.sh), you need > to increase > > the number of task slots to at least 4 in conf/flink-conf.yaml > before > > starting Flink -> taskmanager.numberOfTaskSlots > > > > You should actually see the following exception in > > log/flink-...-jobmanager-...log > > > > > NoResourceAvailableException: Not enough free slots available to > > run the job. You can decrease the operator parallelism or increase > > the number of slots per TaskManager in the configuration. > > > > WordCount-StormTopology does use StormWordCountRemoteBySubmitter > > internally. So, you do use it already ;) > > > > I am not sure what you mean by "get rid of KafkaSource"? It is > still in > > the code base. Which version to you use? In > flink-0.10-SNAPSHOT it is > > located in submodule "flink-connector-kafka" (which is > submodule of > > "flink-streaming-connector-parent" -- which is submodule of > > "flink-streamping-parent"). > > > > > > -Matthias > > > > > > On 09/01/2015 09:40 PM, Jerry Peng wrote: > > > Hello, > > > > > > I have some questions regarding how to run one of the > > > flink-storm-examples, the WordCountTopology. How should I > run the > > job? > > > On github its says I should just execute > > > bin/flink run example.jar but when I execute: > > > > > > bin/flink run WordCount-StormTopology.jar > > > > > > nothing happens. What am I doing wrong? and How can I run the > > > WordCounttopology via StormWordCountRemoteBySubmitter? > > > > > > Also why did you guys get rid of the KafkaSource class? What is > > the API > > > now for subscribing to a kafka source? > > > > > > Best, > > > > > > Jerry > > > > > > signature.asc (836 bytes) Download Attachment |
You can use "bin/flink cancel JOBID" or JobManager WebUI to cancel the
running job. The exception you see, occurs in FlinkSubmitter.killTopology(...) which is not used by "bin/flink cancel" or JobMaanger WebUI. If you compile the example you yourself, just remove the call to killTopology(). -Matthias On 09/01/2015 11:16 PM, Matthias J. Sax wrote: > Oh yes. I forgot about this. I have already a fix for it in a pending > pull request... I hope that this PR is merged soon... > > If you want to observe the progress, look here: > https://issues.apache.org/jira/browse/FLINK-2111 > and > https://issues.apache.org/jira/browse/FLINK-2338 > > This PR, resolves both and fixed the problem you observed: > https://github.com/apache/flink/pull/750 > > -Matthias > > > On 09/01/2015 11:09 PM, Jerry Peng wrote: >> Hello, >> >> I corrected the number of slots for each task manager but now when I try >> to run the WordCount-StormTopology, the job manager daemon on my master >> node crashes and I get this exception in the log: >> >> java.lang.Exception: Received a message >> CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader session ID, >> even though the message requires a leader session ID. >> >> at >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41) >> >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> >> at >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> It seems to have something to do with canceling of the topology after >> the sleep. Any ideas? >> >> >> Best, >> >> >> Jerry >> >> >> On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax >> <[hidden email] <mailto:[hidden email]>> >> wrote: >> >> Yes. That is what I expected. >> >> JobManager cannot start the job, due to less task slots. It logs the >> exception NoResourceAvailableException (it is not shown in stdout; see >> "log" folder). There is no feedback to Flink CLI that the job could not >> be started. >> >> Furthermore, WordCount-StormTopology sleeps for 5 seconds and tries to >> "kill" the job. However, because the job was never started, there is a >> NotAliveException which in print to stdout. >> >> -Matthias >> >> >> >> On 09/01/2015 10:26 PM, Jerry Peng wrote: >> > When I run WordCount-StormTopology I get the following exception: >> > >> > ~/flink/bin/flink run WordCount-StormTopology.jar >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt >> > >> > org.apache.flink.client.program.ProgramInvocationException: The main >> > method caused an error. >> > >> > at >> > >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) >> > >> > at >> > >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >> > >> > at org.apache.flink.client.program.Client.run(Client.java:278) >> > >> > at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) >> > >> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) >> > >> > at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) >> > >> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) >> > >> > Caused by: NotAliveException(msg:null) >> > >> > at >> > >> org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) >> > >> > at >> > >> org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) >> > >> > at >> > >> org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) >> > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > >> > at >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > >> > at >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > >> > at java.lang.reflect.Method.invoke(Method.java:483) >> > >> > at >> > >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >> > >> > ... 6 more >> > >> > >> > The exception above occurred while trying to run your command. >> > >> > >> > Any idea how to fix this? >> > >> > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax >> > <[hidden email] >> <mailto:[hidden email]> >> <mailto:[hidden email] >> <mailto:[hidden email]>>> >> > wrote: >> > >> > Hi Jerry, >> > >> > WordCount-StormTopology uses a hard coded dop of 4. If you >> start up >> > Flink in local mode (bin/start-local-streaming.sh), you need >> to increase >> > the number of task slots to at least 4 in conf/flink-conf.yaml >> before >> > starting Flink -> taskmanager.numberOfTaskSlots >> > >> > You should actually see the following exception in >> > log/flink-...-jobmanager-...log >> > >> > > NoResourceAvailableException: Not enough free slots available to >> > run the job. You can decrease the operator parallelism or increase >> > the number of slots per TaskManager in the configuration. >> > >> > WordCount-StormTopology does use StormWordCountRemoteBySubmitter >> > internally. So, you do use it already ;) >> > >> > I am not sure what you mean by "get rid of KafkaSource"? It is >> still in >> > the code base. Which version to you use? In >> flink-0.10-SNAPSHOT it is >> > located in submodule "flink-connector-kafka" (which is >> submodule of >> > "flink-streaming-connector-parent" -- which is submodule of >> > "flink-streamping-parent"). >> > >> > >> > -Matthias >> > >> > >> > On 09/01/2015 09:40 PM, Jerry Peng wrote: >> > > Hello, >> > > >> > > I have some questions regarding how to run one of the >> > > flink-storm-examples, the WordCountTopology. How should I >> run the >> > job? >> > > On github its says I should just execute >> > > bin/flink run example.jar but when I execute: >> > > >> > > bin/flink run WordCount-StormTopology.jar >> > > >> > > nothing happens. What am I doing wrong? and How can I run the >> > > WordCounttopology via StormWordCountRemoteBySubmitter? >> > > >> > > Also why did you guys get rid of the KafkaSource class? What is >> > the API >> > > now for subscribing to a kafka source? >> > > >> > > Best, >> > > >> > > Jerry >> > >> > >> >> > signature.asc (836 bytes) Download Attachment |
Ya that what I did and everything seems execute fine but when I try to run the WordCount-StormTopology with a file on hfs I get a java.io.FileNotFoundException : java.lang.RuntimeException: java.io.FileNotFoundException: /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or directory) at org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:50) at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:138) at java.io.FileInputStream.<init>(FileInputStream.java:93) at java.io.FileReader.<init>(FileReader.java:58) at org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:48) However I have that file on my hdfs namespace: $ hadoop fs -ls -R / 15/09/01 21:25:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home/jerrypeng drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 /home/jerrypeng/hadoop drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home/jerrypeng/hadoop/dir drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 16:06 /home/jerrypeng/hadoop/hadoop_dir drwxr-xr-x - jerrypeng supergroup 0 2015-09-01 20:48 /home/jerrypeng/hadoop/hadoop_dir/data -rw-r--r-- 3 jerrypeng supergroup 18552 2015-09-01 19:18 /home/jerrypeng/hadoop/hadoop_dir/data/data.txt -rw-r--r-- 3 jerrypeng supergroup 0 2015-09-01 20:48 /home/jerrypeng/hadoop/hadoop_dir/data/result.txt drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 /home/jerrypeng/hadoop/hadoop_dir/dir1 drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 15:59 /home/jerrypeng/hadoop/hadoop_dir/test
-rw-r--r-- 3 jerrypeng supergroup 32 2015-08-24 15:59 /home/jerrypeng/hadoop/hadoop_dir/test/filename.txt Any idea what's going on? On Tue, Sep 1, 2015 at 4:20 PM, Matthias J. Sax <[hidden email]> wrote: You can use "bin/flink cancel JOBID" or JobManager WebUI to cancel the |
Hi Jerry, unfortunately, it seems that the StormFileSpout can only read files from a local filesystem, not from HDFS. Maybe Matthias has something in the works for that. Regards, Aljoscha On Tue, 1 Sep 2015 at 23:33 Jerry Peng <[hidden email]> wrote:
|
Hi,
StormFileSpout uses a simple FileReader internally an cannot deal with HDFS. It would be a nice extension to have. I just opened a JIRA for it: https://issues.apache.org/jira/browse/FLINK-2606 Jerry, feel to work in this feature and contribute code to Flink ;) -Matthias On 09/02/2015 07:52 AM, Aljoscha Krettek wrote: > Hi Jerry, > unfortunately, it seems that the StormFileSpout can only read files from > a local filesystem, not from HDFS. Maybe Matthias has something in the > works for that. > > Regards, > Aljoscha > > On Tue, 1 Sep 2015 at 23:33 Jerry Peng <[hidden email] > <mailto:[hidden email]>> wrote: > > Ya that what I did and everything seems execute fine but when I try > to run the WordCount-StormTopology with a file on hfs I get > a java.io.FileNotFoundException : > > java.lang.RuntimeException: java.io.FileNotFoundException: > /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or > directory) > > at > org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:50) > > at > org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.io.FileNotFoundException: > /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or > directory) > > at java.io.FileInputStream.open(Native Method) > > at java.io.FileInputStream.<init>(FileInputStream.java:138) > > at java.io.FileInputStream.<init>(FileInputStream.java:93) > > at java.io.FileReader.<init>(FileReader.java:58) > > at > org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:48) > > > > However I have that file on my hdfs namespace: > > > $ hadoop fs -ls -R / > > 15/09/01 21:25:11 WARN util.NativeCodeLoader: Unable to load > native-hadoop library for your platform... using builtin-java > classes where applicable > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 > /home/jerrypeng > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 > /home/jerrypeng/hadoop > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 > /home/jerrypeng/hadoop/dir > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 16:06 > /home/jerrypeng/hadoop/hadoop_dir > > drwxr-xr-x - jerrypeng supergroup 0 2015-09-01 20:48 > /home/jerrypeng/hadoop/hadoop_dir/data > > -rw-r--r-- 3 jerrypeng supergroup 18552 2015-09-01 19:18 > /home/jerrypeng/hadoop/hadoop_dir/data/data.txt > > -rw-r--r-- 3 jerrypeng supergroup 0 2015-09-01 20:48 > /home/jerrypeng/hadoop/hadoop_dir/data/result.txt > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 > /home/jerrypeng/hadoop/hadoop_dir/dir1 > > drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 15:59 > /home/jerrypeng/hadoop/hadoop_dir/test > > -rw-r--r-- 3 jerrypeng supergroup 32 2015-08-24 15:59 > /home/jerrypeng/hadoop/hadoop_dir/test/filename.txt > > > Any idea what's going on? > > > On Tue, Sep 1, 2015 at 4:20 PM, Matthias J. Sax > <[hidden email] > <mailto:[hidden email]>> wrote: > > You can use "bin/flink cancel JOBID" or JobManager WebUI to > cancel the > running job. > > The exception you see, occurs in > FlinkSubmitter.killTopology(...) which > is not used by "bin/flink cancel" or JobMaanger WebUI. > > If you compile the example you yourself, just remove the call to > killTopology(). > > -Matthias > > On 09/01/2015 11:16 PM, Matthias J. Sax wrote: > > Oh yes. I forgot about this. I have already a fix for it in a > pending > > pull request... I hope that this PR is merged soon... > > > > If you want to observe the progress, look here: > > https://issues.apache.org/jira/browse/FLINK-2111 > > and > > https://issues.apache.org/jira/browse/FLINK-2338 > > > > This PR, resolves both and fixed the problem you observed: > > https://github.com/apache/flink/pull/750 > > > > -Matthias > > > > > > On 09/01/2015 11:09 PM, Jerry Peng wrote: > >> Hello, > >> > >> I corrected the number of slots for each task manager but now > when I try > >> to run the WordCount-StormTopology, the job manager daemon on > my master > >> node crashes and I get this exception in the log: > >> > >> java.lang.Exception: Received a message > >> CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader > session ID, > >> even though the message requires a leader session ID. > >> > >> at > >> > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41) > >> > >> at > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > >> > >> at > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > >> > >> at > >> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > >> > >> at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > >> > >> at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > >> > >> at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > >> > >> at > >> > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > >> > >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > >> > >> at > >> > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) > >> > >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > >> > >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) > >> > >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > >> > >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) > >> > >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > >> > >> at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> > >> at > >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> > >> at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> > >> at > >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> > >> > >> It seems to have something to do with canceling of the > topology after > >> the sleep. Any ideas? > >> > >> > >> Best, > >> > >> > >> Jerry > >> > >> > >> On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax > >> <[hidden email] > <mailto:[hidden email]> > <mailto:[hidden email] > <mailto:[hidden email]>>> > >> wrote: > >> > >> Yes. That is what I expected. > >> > >> JobManager cannot start the job, due to less task slots. > It logs the > >> exception NoResourceAvailableException (it is not shown > in stdout; see > >> "log" folder). There is no feedback to Flink CLI that the > job could not > >> be started. > >> > >> Furthermore, WordCount-StormTopology sleeps for 5 seconds > and tries to > >> "kill" the job. However, because the job was never > started, there is a > >> NotAliveException which in print to stdout. > >> > >> -Matthias > >> > >> > >> > >> On 09/01/2015 10:26 PM, Jerry Peng wrote: > >> > When I run WordCount-StormTopology I get the following > exception: > >> > > >> > ~/flink/bin/flink run WordCount-StormTopology.jar > >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt > >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt > >> > > >> > > org.apache.flink.client.program.ProgramInvocationException: The main > >> > method caused an error. > >> > > >> > at > >> > > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > >> > > >> > at > >> > > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > >> > > >> > at > org.apache.flink.client.program.Client.run(Client.java:278) > >> > > >> > at > >> > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) > >> > > >> > at > org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) > >> > > >> > at > >> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) > >> > > >> > at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) > >> > > >> > Caused by: NotAliveException(msg:null) > >> > > >> > at > >> > > >> > org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) > >> > > >> > at > >> > > >> > org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) > >> > > >> > at > >> > > >> > org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) > >> > > >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> > > >> > at > >> > > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> > > >> > at > >> > > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > > >> > at java.lang.reflect.Method.invoke(Method.java:483) > >> > > >> > at > >> > > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > >> > > >> > ... 6 more > >> > > >> > > >> > The exception above occurred while trying to run your > command. > >> > > >> > > >> > Any idea how to fix this? > >> > > >> > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax > >> > <[hidden email] > <mailto:[hidden email]> > >> <mailto:[hidden email] > <mailto:[hidden email]>> > >> <mailto:[hidden email] > <mailto:[hidden email]> > >> <mailto:[hidden email] > <mailto:[hidden email]>>>> > >> > wrote: > >> > > >> > Hi Jerry, > >> > > >> > WordCount-StormTopology uses a hard coded dop of 4. > If you > >> start up > >> > Flink in local mode (bin/start-local-streaming.sh), > you need > >> to increase > >> > the number of task slots to at least 4 in > conf/flink-conf.yaml > >> before > >> > starting Flink -> taskmanager.numberOfTaskSlots > >> > > >> > You should actually see the following exception in > >> > log/flink-...-jobmanager-...log > >> > > >> > > NoResourceAvailableException: Not enough free > slots available to > >> > run the job. You can decrease the operator > parallelism or increase > >> > the number of slots per TaskManager in the > configuration. > >> > > >> > WordCount-StormTopology does use > StormWordCountRemoteBySubmitter > >> > internally. So, you do use it already ;) > >> > > >> > I am not sure what you mean by "get rid of > KafkaSource"? It is > >> still in > >> > the code base. Which version to you use? In > >> flink-0.10-SNAPSHOT it is > >> > located in submodule "flink-connector-kafka" (which is > >> submodule of > >> > "flink-streaming-connector-parent" -- which is > submodule of > >> > "flink-streamping-parent"). > >> > > >> > > >> > -Matthias > >> > > >> > > >> > On 09/01/2015 09:40 PM, Jerry Peng wrote: > >> > > Hello, > >> > > > >> > > I have some questions regarding how to run one of the > >> > > flink-storm-examples, the WordCountTopology. How > should I > >> run the > >> > job? > >> > > On github its says I should just execute > >> > > bin/flink run example.jar but when I execute: > >> > > > >> > > bin/flink run WordCount-StormTopology.jar > >> > > > >> > > nothing happens. What am I doing wrong? and How > can I run the > >> > > WordCounttopology via > StormWordCountRemoteBySubmitter? > >> > > > >> > > Also why did you guys get rid of the KafkaSource > class? What is > >> > the API > >> > > now for subscribing to a kafka source? > >> > > > >> > > Best, > >> > > > >> > > Jerry > >> > > >> > > >> > >> > > > > signature.asc (836 bytes) Download Attachment |
One more remark that just came to my mind. There is a storm-hdfs module
available: https://github.com/apache/storm/tree/master/external/storm-hdfs Maybe you can use it. It would be great if you could give feedback if this works for you. -Matthias On 09/02/2015 10:52 AM, Matthias J. Sax wrote: > Hi, > StormFileSpout uses a simple FileReader internally an cannot deal with > HDFS. It would be a nice extension to have. I just opened a JIRA for it: > https://issues.apache.org/jira/browse/FLINK-2606 > > Jerry, feel to work in this feature and contribute code to Flink ;) > > -Matthias > > On 09/02/2015 07:52 AM, Aljoscha Krettek wrote: >> Hi Jerry, >> unfortunately, it seems that the StormFileSpout can only read files from >> a local filesystem, not from HDFS. Maybe Matthias has something in the >> works for that. >> >> Regards, >> Aljoscha >> >> On Tue, 1 Sep 2015 at 23:33 Jerry Peng <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Ya that what I did and everything seems execute fine but when I try >> to run the WordCount-StormTopology with a file on hfs I get >> a java.io.FileNotFoundException : >> >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or >> directory) >> >> at >> org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:50) >> >> at >> org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) >> >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) >> >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.io.FileNotFoundException: >> /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or >> directory) >> >> at java.io.FileInputStream.open(Native Method) >> >> at java.io.FileInputStream.<init>(FileInputStream.java:138) >> >> at java.io.FileInputStream.<init>(FileInputStream.java:93) >> >> at java.io.FileReader.<init>(FileReader.java:58) >> >> at >> org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:48) >> >> >> >> However I have that file on my hdfs namespace: >> >> >> $ hadoop fs -ls -R / >> >> 15/09/01 21:25:11 WARN util.NativeCodeLoader: Unable to load >> native-hadoop library for your platform... using builtin-java >> classes where applicable >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 /home >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 >> /home/jerrypeng >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 >> /home/jerrypeng/hadoop >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:40 >> /home/jerrypeng/hadoop/dir >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 16:06 >> /home/jerrypeng/hadoop/hadoop_dir >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-09-01 20:48 >> /home/jerrypeng/hadoop/hadoop_dir/data >> >> -rw-r--r-- 3 jerrypeng supergroup 18552 2015-09-01 19:18 >> /home/jerrypeng/hadoop/hadoop_dir/data/data.txt >> >> -rw-r--r-- 3 jerrypeng supergroup 0 2015-09-01 20:48 >> /home/jerrypeng/hadoop/hadoop_dir/data/result.txt >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-21 14:41 >> /home/jerrypeng/hadoop/hadoop_dir/dir1 >> >> drwxr-xr-x - jerrypeng supergroup 0 2015-08-24 15:59 >> /home/jerrypeng/hadoop/hadoop_dir/test >> >> -rw-r--r-- 3 jerrypeng supergroup 32 2015-08-24 15:59 >> /home/jerrypeng/hadoop/hadoop_dir/test/filename.txt >> >> >> Any idea what's going on? >> >> >> On Tue, Sep 1, 2015 at 4:20 PM, Matthias J. Sax >> <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> You can use "bin/flink cancel JOBID" or JobManager WebUI to >> cancel the >> running job. >> >> The exception you see, occurs in >> FlinkSubmitter.killTopology(...) which >> is not used by "bin/flink cancel" or JobMaanger WebUI. >> >> If you compile the example you yourself, just remove the call to >> killTopology(). >> >> -Matthias >> >> On 09/01/2015 11:16 PM, Matthias J. Sax wrote: >> > Oh yes. I forgot about this. I have already a fix for it in a >> pending >> > pull request... I hope that this PR is merged soon... >> > >> > If you want to observe the progress, look here: >> > https://issues.apache.org/jira/browse/FLINK-2111 >> > and >> > https://issues.apache.org/jira/browse/FLINK-2338 >> > >> > This PR, resolves both and fixed the problem you observed: >> > https://github.com/apache/flink/pull/750 >> > >> > -Matthias >> > >> > >> > On 09/01/2015 11:09 PM, Jerry Peng wrote: >> >> Hello, >> >> >> >> I corrected the number of slots for each task manager but now >> when I try >> >> to run the WordCount-StormTopology, the job manager daemon on >> my master >> >> node crashes and I get this exception in the log: >> >> >> >> java.lang.Exception: Received a message >> >> CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader >> session ID, >> >> even though the message requires a leader session ID. >> >> >> >> at >> >> >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41) >> >> >> >> at >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> >> >> >> at >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> >> >> >> at >> >> >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> >> >> >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> >> >> >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> >> >> >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> >> >> >> at >> >> >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> >> >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> >> >> >> at >> >> >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104) >> >> >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >> >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> >> >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> >> >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> >> >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> >> >> >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> >> >> at >> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> >> >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> >> >> at >> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> >> >> It seems to have something to do with canceling of the >> topology after >> >> the sleep. Any ideas? >> >> >> >> >> >> Best, >> >> >> >> >> >> Jerry >> >> >> >> >> >> On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax >> >> <[hidden email] >> <mailto:[hidden email]> >> <mailto:[hidden email] >> <mailto:[hidden email]>>> >> >> wrote: >> >> >> >> Yes. That is what I expected. >> >> >> >> JobManager cannot start the job, due to less task slots. >> It logs the >> >> exception NoResourceAvailableException (it is not shown >> in stdout; see >> >> "log" folder). There is no feedback to Flink CLI that the >> job could not >> >> be started. >> >> >> >> Furthermore, WordCount-StormTopology sleeps for 5 seconds >> and tries to >> >> "kill" the job. However, because the job was never >> started, there is a >> >> NotAliveException which in print to stdout. >> >> >> >> -Matthias >> >> >> >> >> >> >> >> On 09/01/2015 10:26 PM, Jerry Peng wrote: >> >> > When I run WordCount-StormTopology I get the following >> exception: >> >> > >> >> > ~/flink/bin/flink run WordCount-StormTopology.jar >> >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt >> >> > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt >> >> > >> >> > >> org.apache.flink.client.program.ProgramInvocationException: The main >> >> > method caused an error. >> >> > >> >> > at >> >> > >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) >> >> > >> >> > at >> >> > >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >> >> > >> >> > at >> org.apache.flink.client.program.Client.run(Client.java:278) >> >> > >> >> > at >> >> >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) >> >> > >> >> > at >> org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) >> >> > >> >> > at >> >> >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) >> >> > >> >> > at >> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) >> >> > >> >> > Caused by: NotAliveException(msg:null) >> >> > >> >> > at >> >> > >> >> >> org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209) >> >> > >> >> > at >> >> > >> >> >> org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203) >> >> > >> >> > at >> >> > >> >> >> org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80) >> >> > >> >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> >> > >> >> > at >> >> > >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> > >> >> > at >> >> > >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> > >> >> > at java.lang.reflect.Method.invoke(Method.java:483) >> >> > >> >> > at >> >> > >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >> >> > >> >> > ... 6 more >> >> > >> >> > >> >> > The exception above occurred while trying to run your >> command. >> >> > >> >> > >> >> > Any idea how to fix this? >> >> > >> >> > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax >> >> > <[hidden email] >> <mailto:[hidden email]> >> >> <mailto:[hidden email] >> <mailto:[hidden email]>> >> >> <mailto:[hidden email] >> <mailto:[hidden email]> >> >> <mailto:[hidden email] >> <mailto:[hidden email]>>>> >> >> > wrote: >> >> > >> >> > Hi Jerry, >> >> > >> >> > WordCount-StormTopology uses a hard coded dop of 4. >> If you >> >> start up >> >> > Flink in local mode (bin/start-local-streaming.sh), >> you need >> >> to increase >> >> > the number of task slots to at least 4 in >> conf/flink-conf.yaml >> >> before >> >> > starting Flink -> taskmanager.numberOfTaskSlots >> >> > >> >> > You should actually see the following exception in >> >> > log/flink-...-jobmanager-...log >> >> > >> >> > > NoResourceAvailableException: Not enough free >> slots available to >> >> > run the job. You can decrease the operator >> parallelism or increase >> >> > the number of slots per TaskManager in the >> configuration. >> >> > >> >> > WordCount-StormTopology does use >> StormWordCountRemoteBySubmitter >> >> > internally. So, you do use it already ;) >> >> > >> >> > I am not sure what you mean by "get rid of >> KafkaSource"? It is >> >> still in >> >> > the code base. Which version to you use? In >> >> flink-0.10-SNAPSHOT it is >> >> > located in submodule "flink-connector-kafka" (which is >> >> submodule of >> >> > "flink-streaming-connector-parent" -- which is >> submodule of >> >> > "flink-streamping-parent"). >> >> > >> >> > >> >> > -Matthias >> >> > >> >> > >> >> > On 09/01/2015 09:40 PM, Jerry Peng wrote: >> >> > > Hello, >> >> > > >> >> > > I have some questions regarding how to run one of the >> >> > > flink-storm-examples, the WordCountTopology. How >> should I >> >> run the >> >> > job? >> >> > > On github its says I should just execute >> >> > > bin/flink run example.jar but when I execute: >> >> > > >> >> > > bin/flink run WordCount-StormTopology.jar >> >> > > >> >> > > nothing happens. What am I doing wrong? and How >> can I run the >> >> > > WordCounttopology via >> StormWordCountRemoteBySubmitter? >> >> > > >> >> > > Also why did you guys get rid of the KafkaSource >> class? What is >> >> > the API >> >> > > now for subscribing to a kafka source? >> >> > > >> >> > > Best, >> >> > > >> >> > > Jerry >> >> > >> >> > >> >> >> >> >> > >> >> > signature.asc (836 bytes) Download Attachment |
Free forum by Nabble | Edit this page |