Hi, I am trying to execute few storm topologies using Flink, I have a question related to the documentation, Can anyone tell me which of the below code is correct, I want to use Flink-storm 1.0-SNAPSHOT version, I don’t see any createTopology method in FlinkTopology class. Ex, cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing something ;) Thanks, Naveen The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. |
Hi Naveen,
I think you're not using the latest 1.0-SNAPSHOT. Did you build from source? If so, you need to build again because the snapshot API has been updated recently. Best regards, Max On Thu, Dec 3, 2015 at 6:40 PM, Madhire, Naveen <[hidden email]> wrote: > Hi, > > I am trying to execute few storm topologies using Flink, I have a question > related to the documentation, > > Can anyone tell me which of the below code is correct, > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html > > https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compatibility.html > > > I want to use Flink-storm 1.0-SNAPSHOT version, I don’t see any > createTopology method in FlinkTopology class. > > Ex, cluster.submitTopology("WordCount", conf, > FlinkTopology.createTopology(builder)); > > Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing > something ;) > > Thanks, > Naveen > > ________________________________ > > The information contained in this e-mail is confidential and/or proprietary > to Capital One and/or its affiliates and may only be used solely in > performance of work or services for Capital One. The information transmitted > herewith is intended only for use by the individual or entity to which it is > addressed. If the reader of this message is not the intended recipient, you > are hereby notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action in reliance > upon this information is strictly prohibited. If you have received this > communication in error, please contact the sender and delete the material > from your computer. |
Thanks Max. I was able to get through the compilation error after building
it from source. I am trying to run simple word count topology in Storm and want to compare it with Flink and see how the output is coming out. I am running a simple word count storm topology of read -> split -> count -> print on console The code is present at https://github.com/naveenmadhire/flink-storm-example. When I run the WordCountTopologyFlink.java program, I don¹t see any messages on the console. I modified this class in the same way as it is mentioned in the flink documentation. The detailed job log is at https://gist.github.com/naveenmadhire/be23d54ed14c5e41ab7c When you get some time, can you please check to see why it is not printing anything on the console in local mode. Thanks, Naveen On 12/3/15, 12:11 PM, "Maximilian Michels" <[hidden email]> wrote: >Hi Naveen, > >I think you're not using the latest 1.0-SNAPSHOT. Did you build from >source? If so, you need to build again because the snapshot API has >been updated recently. > >Best regards, >Max > >On Thu, Dec 3, 2015 at 6:40 PM, Madhire, Naveen ><[hidden email]> wrote: >> Hi, >> >> I am trying to execute few storm topologies using Flink, I have a >>question >> related to the documentation, >> >> Can anyone tell me which of the below code is correct, >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_c >>ompatibility.html >> >> >>https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compati >>bility.html >> >> >> I want to use Flink-storm 1.0-SNAPSHOT version, I don¹t see any >> createTopology method in FlinkTopology class. >> >> Ex, cluster.submitTopology("WordCount", conf, >> FlinkTopology.createTopology(builder)); >> >> Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing >> something ;) >> >> Thanks, >> Naveen >> >> ________________________________ >> >> The information contained in this e-mail is confidential and/or >>proprietary >> to Capital One and/or its affiliates and may only be used solely in >> performance of work or services for Capital One. The information >>transmitted >> herewith is intended only for use by the individual or entity to which >>it is >> addressed. If the reader of this message is not the intended recipient, >>you >> are hereby notified that any review, retransmission, dissemination, >> distribution, copying or other use of, or taking of any action in >>reliance >> upon this information is strictly prohibited. If you have received this >> communication in error, please contact the sender and delete the >>material >> from your computer. ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. |
Hi Naveen,
Were you using Maven before? The syncing of changes in the master always takes a while for Maven. The documentation happened to be updated before Maven synchronized. Building and installing manually (what you did) solves the problem. Strangely, when I run your code on my machine with the latest 1.0-SNAPSHOT I see a lot of output on my console. Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89 Could you add bolt which writes the Storm tuples to a file? Is that file also empty? builder.setBolt("file", new BoltFileSink("/tmp/storm", new OutputFormatter() { @Override public String format(Tuple tuple) { return tuple.toString(); } }), 1).shuffleGrouping("count"); Thanks, Max |
Hi Max,
Yeah, I did route the ³count² bolt output to a file and I see the output. I can see the Storm and Flink output matching. However, I am not able to use the BoltFileSink class in the 1.0-SNAPSHOT which I built. I think it¹s better to wait for a day for the Maven sync to happen so that I can directly use 1.0-SNAPSHOT in the dependency. I have few Storm topologies, which I will try to run on Flink over the next few days. I will let you know how that goes. Thanks :) Thanks, Naveen On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote: >Hi Naveen, > >Were you using Maven before? The syncing of changes in the master >always takes a while for Maven. The documentation happened to be >updated before Maven synchronized. Building and installing manually >(what you did) solves the problem. > >Strangely, when I run your code on my machine with the latest >1.0-SNAPSHOT I see a lot of output on my console. > >Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89 > >Could you add bolt which writes the Storm tuples to a file? Is that >file also empty? > >builder.setBolt("file", new BoltFileSink("/tmp/storm", new >OutputFormatter() { > @Override > public String format(Tuple tuple) { > return tuple.toString(); > } >}), 1).shuffleGrouping("count"); > > >Thanks, >Max ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. |
Hi Max,
I forgot to include flink-storm-examples dependency in the application to use BoltFileSink. However, the file created by the BoltFileSink is empty. Is there any other stuff which I need to do to write it into a file by using BoltFileSink? I am using the same code what you mentioned, builder.setBolt("file", new BoltFileSink("/tmp/storm", new OutputFormatter() { @Override public String format(Tuple tuple) { return tuple.toString(); } }), 1).shuffleGrouping("count"); Thanks, Naveen > >On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote: > >>Hi Naveen, >> >>Were you using Maven before? The syncing of changes in the master >>always takes a while for Maven. The documentation happened to be >>updated before Maven synchronized. Building and installing manually >>(what you did) solves the problem. >> >>Strangely, when I run your code on my machine with the latest >>1.0-SNAPSHOT I see a lot of output on my console. >> >>Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89 >> >>Could you add bolt which writes the Storm tuples to a file? Is that >>file also empty? >> >>builder.setBolt("file", new BoltFileSink("/tmp/storm", new >>OutputFormatter() { >> @Override >> public String format(Tuple tuple) { >> return tuple.toString(); >> } >>}), 1).shuffleGrouping("count"); >> >> >>Thanks, >>Max > >________________________________________________________ > >The information contained in this e-mail is confidential and/or >proprietary to Capital One and/or its affiliates and may only be used >solely in performance of work or services for Capital One. The >information transmitted herewith is intended only for use by the >individual or entity to which it is addressed. If the reader of this >message is not the intended recipient, you are hereby notified that any >review, retransmission, dissemination, distribution, copying or other use >of, or taking of any action in reliance upon this information is strictly >prohibited. If you have received this communication in error, please >contact the sender and delete the material from your computer. > ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. |
Hi Naveen,
in you previous mail you mention that > Yeah, I did route the ³count² bolt output to a file and I see the output. > I can see the Storm and Flink output matching. How did you do this? Modifying the "count bolt" code? Or did you use some other bolt that consumes the "count bolt" output? One more thought: how much data do you have and did you terminate you program before looking into the result file? I am asking because BoltFileSink uses a BufferedOutputWriter internally -- if you have only a few records in your result and do not terminate, the data might still be buffered. I would get flushed to disc if you terminate the program. Otherwise, I could not spot any issue with your code. And as Max mentioned that the console output worked for him using you program I am little puzzled what might go wrong in your setup. The program seems to be correct. -Matthias On 12/04/2015 08:55 PM, Madhire, Naveen wrote: > Hi Max, > > I forgot to include flink-storm-examples dependency in the application to > use BoltFileSink. > > However, the file created by the BoltFileSink is empty. Is there any other > stuff which I need to do to write it into a file by using BoltFileSink? > > I am using the same code what you mentioned, > > builder.setBolt("file", new BoltFileSink("/tmp/storm", new > OutputFormatter() { > @Override > public String format(Tuple tuple) { > return tuple.toString(); > } > }), 1).shuffleGrouping("count"); > > > > > Thanks, > Naveen > > > > >> >> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote: >> >>> Hi Naveen, >>> >>> Were you using Maven before? The syncing of changes in the master >>> always takes a while for Maven. The documentation happened to be >>> updated before Maven synchronized. Building and installing manually >>> (what you did) solves the problem. >>> >>> Strangely, when I run your code on my machine with the latest >>> 1.0-SNAPSHOT I see a lot of output on my console. >>> >>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89 >>> >>> Could you add bolt which writes the Storm tuples to a file? Is that >>> file also empty? >>> >>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new >>> OutputFormatter() { >>> @Override >>> public String format(Tuple tuple) { >>> return tuple.toString(); >>> } >>> }), 1).shuffleGrouping("count"); >>> >>> >>> Thanks, >>> Max >> >> ________________________________________________________ >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The >> information transmitted herewith is intended only for use by the >> individual or entity to which it is addressed. If the reader of this >> message is not the intended recipient, you are hereby notified that any >> review, retransmission, dissemination, distribution, copying or other use >> of, or taking of any action in reliance upon this information is strictly >> prohibited. If you have received this communication in error, please >> contact the sender and delete the material from your computer. >> > > ________________________________________________________ > > The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. > |
Hi Matthias, Sorry for the confusion. I just used a simple code in the
Count Bolt to write the bolt output into a file and was not using BiltFileSink. OutputStream o; try { o = new FileOutputStream("/tmp/wordcount.txt", true); o.write((word + " " + count.toString() + "\n").getBytes()); o.close(); } catch (IOException e) { e.printStackTrace(); } Coming to BoltFileSink, I tried using cluster.shutdown at the end which stops the local cluster but getting the below exception, java.lang.Exception: TaskManager is shutting down. at org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala :216) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager .scala:119) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi nishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: 1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav a:107) I added the below lines of code for stoping the local cluster at the end, the code is same as flink-storm-examples one. Utils.sleep(10 * 1000); cluster.shutdown(); Thanks, Naveen On 12/5/15, 7:54 AM, "Matthias J. Sax" <[hidden email]> wrote: >Hi Naveen, > >in you previous mail you mention that > >> Yeah, I did route the ³count² bolt output to a file and I see the >>output. >> I can see the Storm and Flink output matching. > >How did you do this? Modifying the "count bolt" code? Or did you use >some other bolt that consumes the "count bolt" output? > >One more thought: how much data do you have and did you terminate you >program before looking into the result file? I am asking because >BoltFileSink uses a BufferedOutputWriter internally -- if you have only >a few records in your result and do not terminate, the data might still >be buffered. I would get flushed to disc if you terminate the program. > >Otherwise, I could not spot any issue with your code. And as Max >mentioned that the console output worked for him using you program I am >little puzzled what might go wrong in your setup. The program seems to >be correct. > > >-Matthias > > >On 12/04/2015 08:55 PM, Madhire, Naveen wrote: >> Hi Max, >> >> I forgot to include flink-storm-examples dependency in the application >>to >> use BoltFileSink. >> >> However, the file created by the BoltFileSink is empty. Is there any >>other >> stuff which I need to do to write it into a file by using BoltFileSink? >> >> I am using the same code what you mentioned, >> >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new >> OutputFormatter() { >> @Override >> public String format(Tuple tuple) { >> return tuple.toString(); >> } >> }), 1).shuffleGrouping("count"); >> >> >> >> >> Thanks, >> Naveen >> >> >> >> >>> >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote: >>> >>>> Hi Naveen, >>>> >>>> Were you using Maven before? The syncing of changes in the master >>>> always takes a while for Maven. The documentation happened to be >>>> updated before Maven synchronized. Building and installing manually >>>> (what you did) solves the problem. >>>> >>>> Strangely, when I run your code on my machine with the latest >>>> 1.0-SNAPSHOT I see a lot of output on my console. >>>> >>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89 >>>> >>>> Could you add bolt which writes the Storm tuples to a file? Is that >>>> file also empty? >>>> >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new >>>> OutputFormatter() { >>>> @Override >>>> public String format(Tuple tuple) { >>>> return tuple.toString(); >>>> } >>>> }), 1).shuffleGrouping("count"); >>>> >>>> >>>> Thanks, >>>> Max >>> >>> ________________________________________________________ >>> >>> The information contained in this e-mail is confidential and/or >>> proprietary to Capital One and/or its affiliates and may only be used >>> solely in performance of work or services for Capital One. The >>> information transmitted herewith is intended only for use by the >>> individual or entity to which it is addressed. If the reader of this >>> message is not the intended recipient, you are hereby notified that any >>> review, retransmission, dissemination, distribution, copying or other >>>use >>> of, or taking of any action in reliance upon this information is >>>strictly >>> prohibited. If you have received this communication in error, please >>> contact the sender and delete the material from your computer. >>> >> >> ________________________________________________________ >> >> The information contained in this e-mail is confidential and/or >>proprietary to Capital One and/or its affiliates and may only be used >>solely in performance of work or services for Capital One. The >>information transmitted herewith is intended only for use by the >>individual or entity to which it is addressed. If the reader of this >>message is not the intended recipient, you are hereby notified that any >>review, retransmission, dissemination, distribution, copying or other >>use of, or taking of any action in reliance upon this information is >>strictly prohibited. If you have received this communication in error, >>please contact the sender and delete the material from your computer. >> > ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. |
Hi Naveen, You were not seeing console output because you didn't set up a log4j.properties file. Put the following in a file called log4j.properties in a folder "resources" under src/main/resources:In your code on GitHub, please remove the following from the WordCount file:
It is not necessary because you already have a bolt which prints to a file. What this code did, is overwriting the wordcount1.txt file on every incoming tuple. log4j.rootLogger=INFO, consoleThen you will also see console output. We will fix the submission code of Storm such that this won't be necessary in the future. By the way, the recommended template for Flink Jobs on Storm is to start off with the Flink Quickstart project: https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html This would already contain the log4.properties file. Best, Max On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen <[hidden email]> wrote: Hi Matthias, Sorry for the confusion. I just used a simple code in the |
Hi Naveen, Turns out I had changed the pom.xml after I checked out your code while trying to get your example working. I have found the real issue of your problem. Please make sure you have the following dependency in your pom.xml (in addition to the storm modules). <dependency> The quickstart also contains this. It shouldn't be necessary but it's a workaround for a bug which we just discovered with your help. Thank you for reporting! Best regards, Max On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <[hidden email]> wrote:
|
Hi Naveen,
just for completeness: Max fixed this bug today and we also updated the documentation. As you are using SNAPSHOT version, you do not need to include "flink-java" any more if you update to the latest version containing the fix. Furthermore, *do not* include "storm-core" as an dependency -- this will result in a Kryo problem due to a Flink/Storm Kryo version conflict. (The dependency is not needed anyway, as you get it automatically via "flink-storm-examples" or "flink-storm".) This Kryo version conflict was the problem in the first place. It resulted in a Kryo exception when running your program longer that 10 seconds. As you stopped after 10 seconds, you did not see the exception and just an empty result file :/ -Matthias On 12/08/2015 05:22 PM, Maximilian Michels wrote: > Hi Naveen, > > Turns out I had changed the pom.xml after I checked out your code while > trying to get your example working. I have found the real issue of your > problem. Please make sure you have the following dependency in your > pom.xml (in addition to the storm modules). > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.0-SNAPSHOT</version> > </dependency> > > The quickstart also contains this. It shouldn't be necessary but it's a > workaround for a bug which we just discovered with your help. Thank you > for reporting! > > Best regards, > Max > > On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Naveen, > > In your code on GitHub, please remove the following from the > WordCount file: > > > OutputStream o; > try { > o = new FileOutputStream("/tmp/wordcount1.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes()); > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } > > > It is not necessary because you already have a bolt which prints to > a file. What this code did, is overwriting the wordcount1.txt file > on every incoming tuple. > > You were not seeing console output because you didn't set up a > log4j.properties file. Put the following in a file called > log4j.properties in a folder "resources" under src/main/resources: > > log4j.rootLogger=INFO, console > > log4j.appender.console=org.apache.log4j.ConsoleAppender > log4j.appender.console.layout=org.apache.log4j.PatternLayout > log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n > > Then you will also see console output. We will fix the submission > code of Storm such that this won't be necessary in the future. By > the way, the recommended template for Flink Jobs on Storm is to > start off with the Flink Quickstart project: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html > This would already contain the log4.properties file. > > Best, > Max > > > On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen > <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Matthias, Sorry for the confusion. I just used a simple code > in the > Count Bolt to write the bolt output into a file and was not using > BiltFileSink. > > OutputStream o; > try { > o = new FileOutputStream("/tmp/wordcount.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes()); > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } > > > > > Coming to BoltFileSink, I tried using cluster.shutdown at the > end which > stops the local cluster but getting the below exception, > > java.lang.Exception: TaskManager is shutting down. > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala > :216) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager > .scala:119) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi > nishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: > 1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav > a:107) > > > > I added the below lines of code for stoping the local cluster at > the end, > the code is same as flink-storm-examples one. > > Utils.sleep(10 * 1000); > > cluster.shutdown(); > > > > > Thanks, > Naveen > > > > > On 12/5/15, 7:54 AM, "Matthias J. Sax" <[hidden email] > <mailto:[hidden email]>> wrote: > > >Hi Naveen, > > > >in you previous mail you mention that > > > >> Yeah, I did route the ³count² bolt output to a file and I see the > >>output. > >> I can see the Storm and Flink output matching. > > > >How did you do this? Modifying the "count bolt" code? Or did > you use > >some other bolt that consumes the "count bolt" output? > > > >One more thought: how much data do you have and did you > terminate you > >program before looking into the result file? I am asking because > >BoltFileSink uses a BufferedOutputWriter internally -- if you > have only > >a few records in your result and do not terminate, the data > might still > >be buffered. I would get flushed to disc if you terminate the > program. > > > >Otherwise, I could not spot any issue with your code. And as Max > >mentioned that the console output worked for him using you > program I am > >little puzzled what might go wrong in your setup. The program > seems to > >be correct. > > > > > >-Matthias > > > > > >On 12/04/2015 08:55 PM, Madhire, Naveen wrote: > >> Hi Max, > >> > >> I forgot to include flink-storm-examples dependency in the > application > >>to > >> use BoltFileSink. > >> > >> However, the file created by the BoltFileSink is empty. Is > there any > >>other > >> stuff which I need to do to write it into a file by using > BoltFileSink? > >> > >> I am using the same code what you mentioned, > >> > >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new > >> OutputFormatter() { > >> @Override > >> public String format(Tuple tuple) { > >> return tuple.toString(); > >> } > >> }), 1).shuffleGrouping("count"); > >> > >> > >> > >> > >> Thanks, > >> Naveen > >> > >> > >> > >> > >>> > >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email] > <mailto:[hidden email]>> wrote: > >>> > >>>> Hi Naveen, > >>>> > >>>> Were you using Maven before? The syncing of changes in the > master > >>>> always takes a while for Maven. The documentation happened > to be > >>>> updated before Maven synchronized. Building and installing > manually > >>>> (what you did) solves the problem. > >>>> > >>>> Strangely, when I run your code on my machine with the latest > >>>> 1.0-SNAPSHOT I see a lot of output on my console. > >>>> > >>>> Here's the output: > https://gist.github.com/mxm/98cd927866b193ce0f89 > >>>> > >>>> Could you add bolt which writes the Storm tuples to a file? > Is that > >>>> file also empty? > >>>> > >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new > >>>> OutputFormatter() { > >>>> @Override > >>>> public String format(Tuple tuple) { > >>>> return tuple.toString(); > >>>> } > >>>> }), 1).shuffleGrouping("count"); > >>>> > >>>> > >>>> Thanks, > >>>> Max > >>> > >>> ________________________________________________________ > >>> > >>> The information contained in this e-mail is confidential and/or > >>> proprietary to Capital One and/or its affiliates and may > only be used > >>> solely in performance of work or services for Capital One. The > >>> information transmitted herewith is intended only for use by the > >>> individual or entity to which it is addressed. If the reader > of this > >>> message is not the intended recipient, you are hereby > notified that any > >>> review, retransmission, dissemination, distribution, copying > or other > >>>use > >>> of, or taking of any action in reliance upon this information is > >>>strictly > >>> prohibited. If you have received this communication in > error, please > >>> contact the sender and delete the material from your computer. > >>> > >> > >> ________________________________________________________ > >> > >> The information contained in this e-mail is confidential and/or > >>proprietary to Capital One and/or its affiliates and may only > be used > >>solely in performance of work or services for Capital One. The > >>information transmitted herewith is intended only for use by the > >>individual or entity to which it is addressed. If the reader > of this > >>message is not the intended recipient, you are hereby notified > that any > >>review, retransmission, dissemination, distribution, copying > or other > >>use of, or taking of any action in reliance upon this > information is > >>strictly prohibited. If you have received this communication > in error, > >>please contact the sender and delete the material from your > computer. > >> > > > > ________________________________________________________ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be > used solely in performance of work or services for Capital One. > The information transmitted herewith is intended only for use by > the individual or entity to which it is addressed. If the reader > of this message is not the intended recipient, you are hereby > notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action > in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the > sender and delete the material from your computer. > > > |
Free forum by Nabble | Edit this page |