Flink Storm

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Storm

Madhire, Naveen
Hi,

I am trying to execute few storm topologies using Flink, I have a question related to the documentation,

Can anyone tell me which of the below code is correct,




I want to use Flink-storm 1.0-SNAPSHOT version, I don’t see any createTopology method in FlinkTopology class.

Ex, cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));

Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing something ;)

Thanks,
Naveen


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Maximilian Michels
Hi Naveen,

I think you're not using the latest 1.0-SNAPSHOT. Did you build from
source? If so, you need to build again because the snapshot API has
been updated recently.

Best regards,
Max

On Thu, Dec 3, 2015 at 6:40 PM, Madhire, Naveen
<[hidden email]> wrote:

> Hi,
>
> I am trying to execute few storm topologies using Flink, I have a question
> related to the documentation,
>
> Can anyone tell me which of the below code is correct,
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compatibility.html
>
>
> I want to use Flink-storm 1.0-SNAPSHOT version, I don’t see any
> createTopology method in FlinkTopology class.
>
> Ex, cluster.submitTopology("WordCount", conf,
> FlinkTopology.createTopology(builder));
>
> Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing
> something ;)
>
> Thanks,
> Naveen
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Madhire, Naveen
Thanks Max. I was able to get through the compilation error after building
it from source.

I am trying to run simple word count topology in Storm and want to compare
it with Flink and see how the output is coming out.

I am running a simple word count storm topology of read -> split -> count
-> print on console

The code is present at
https://github.com/naveenmadhire/flink-storm-example. When I run the
WordCountTopologyFlink.java program, I don¹t see any messages on the
console. I modified this class in the same way as it is mentioned in the
flink documentation.



The detailed job log is at
https://gist.github.com/naveenmadhire/be23d54ed14c5e41ab7c


When you get some time, can you please check to see why it is not printing
anything on the console in local mode.


Thanks,
Naveen



On 12/3/15, 12:11 PM, "Maximilian Michels" <[hidden email]> wrote:

>Hi Naveen,
>
>I think you're not using the latest 1.0-SNAPSHOT. Did you build from
>source? If so, you need to build again because the snapshot API has
>been updated recently.
>
>Best regards,
>Max
>
>On Thu, Dec 3, 2015 at 6:40 PM, Madhire, Naveen
><[hidden email]> wrote:
>> Hi,
>>
>> I am trying to execute few storm topologies using Flink, I have a
>>question
>> related to the documentation,
>>
>> Can anyone tell me which of the below code is correct,
>>
>>
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_c
>>ompatibility.html
>>
>>
>>https://ci.apache.org/projects/flink/flink-docs-master/apis/storm_compati
>>bility.html
>>
>>
>> I want to use Flink-storm 1.0-SNAPSHOT version, I don¹t see any
>> createTopology method in FlinkTopology class.
>>
>> Ex, cluster.submitTopology("WordCount", conf,
>> FlinkTopology.createTopology(builder));
>>
>> Is the documentation incorrect for the 1.0-SNAPSHOT or may be I missing
>> something ;)
>>
>> Thanks,
>> Naveen
>>
>> ________________________________
>>
>> The information contained in this e-mail is confidential and/or
>>proprietary
>> to Capital One and/or its affiliates and may only be used solely in
>> performance of work or services for Capital One. The information
>>transmitted
>> herewith is intended only for use by the individual or entity to which
>>it is
>> addressed. If the reader of this message is not the intended recipient,
>>you
>> are hereby notified that any review, retransmission, dissemination,
>> distribution, copying or other use of, or taking of any action in
>>reliance
>> upon this information is strictly prohibited. If you have received this
>> communication in error, please contact the sender and delete the
>>material
>> from your computer.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Maximilian Michels
Hi Naveen,

Were you using Maven before? The syncing of changes in the master
always takes a while for Maven. The documentation happened to be
updated before Maven synchronized. Building and installing manually
(what you did) solves the problem.

Strangely, when I run your code on my machine with the latest
1.0-SNAPSHOT I see a lot of output on my console.

Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89

Could you add bolt which writes the Storm tuples to a file? Is that
file also empty?

builder.setBolt("file", new BoltFileSink("/tmp/storm", new OutputFormatter() {
   @Override
   public String format(Tuple tuple) {
      return tuple.toString();
   }
}), 1).shuffleGrouping("count");


Thanks,
Max
Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Madhire, Naveen
Hi Max,

Yeah, I did route the ³count² bolt output to a file and I see the output.
I can see the Storm and Flink output matching.

However, I am not able to use the BoltFileSink class in the 1.0-SNAPSHOT
which I built. I think it¹s better to wait for a day for the Maven sync to
happen so that I can directly use 1.0-SNAPSHOT in the dependency.

I have few Storm topologies, which I will try to run on Flink over the
next few days. I will let you know how that goes. Thanks :)


Thanks,
Naveen

On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote:

>Hi Naveen,
>
>Were you using Maven before? The syncing of changes in the master
>always takes a while for Maven. The documentation happened to be
>updated before Maven synchronized. Building and installing manually
>(what you did) solves the problem.
>
>Strangely, when I run your code on my machine with the latest
>1.0-SNAPSHOT I see a lot of output on my console.
>
>Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>
>Could you add bolt which writes the Storm tuples to a file? Is that
>file also empty?
>
>builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>OutputFormatter() {
>   @Override
>   public String format(Tuple tuple) {
>      return tuple.toString();
>   }
>}), 1).shuffleGrouping("count");
>
>
>Thanks,
>Max

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Madhire, Naveen
Hi Max,

I forgot to include flink-storm-examples dependency in the application to
use BoltFileSink.

However, the file created by the BoltFileSink is empty. Is there any other
stuff which I need to do to write it into a file by using BoltFileSink?

I am using the same code what you mentioned,

builder.setBolt("file", new BoltFileSink("/tmp/storm", new
OutputFormatter() {
   @Override
   public String format(Tuple tuple) {
      return tuple.toString();
   }
}), 1).shuffleGrouping("count");




Thanks,
Naveen




>
>On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote:
>
>>Hi Naveen,
>>
>>Were you using Maven before? The syncing of changes in the master
>>always takes a while for Maven. The documentation happened to be
>>updated before Maven synchronized. Building and installing manually
>>(what you did) solves the problem.
>>
>>Strangely, when I run your code on my machine with the latest
>>1.0-SNAPSHOT I see a lot of output on my console.
>>
>>Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>>
>>Could you add bolt which writes the Storm tuples to a file? Is that
>>file also empty?
>>
>>builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>>OutputFormatter() {
>>   @Override
>>   public String format(Tuple tuple) {
>>      return tuple.toString();
>>   }
>>}), 1).shuffleGrouping("count");
>>
>>
>>Thanks,
>>Max
>
>________________________________________________________
>
>The information contained in this e-mail is confidential and/or
>proprietary to Capital One and/or its affiliates and may only be used
>solely in performance of work or services for Capital One. The
>information transmitted herewith is intended only for use by the
>individual or entity to which it is addressed. If the reader of this
>message is not the intended recipient, you are hereby notified that any
>review, retransmission, dissemination, distribution, copying or other use
>of, or taking of any action in reliance upon this information is strictly
>prohibited. If you have received this communication in error, please
>contact the sender and delete the material from your computer.
>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Matthias J. Sax-2
Hi Naveen,

in you previous mail you mention that

> Yeah, I did route the ³count² bolt output to a file and I see the output.
> I can see the Storm and Flink output matching.

How did you do this? Modifying the "count bolt" code? Or did you use
some other bolt that consumes the "count bolt" output?

One more thought: how much data do you have and did you terminate you
program before looking into the result file? I am asking because
BoltFileSink uses a BufferedOutputWriter internally -- if you have only
a few records in your result and do not terminate, the data might still
be buffered. I would get flushed to disc if you terminate the program.

Otherwise, I could not spot any issue with your code. And as Max
mentioned that the console output worked for him using you program I am
little puzzled what might go wrong in your setup. The program seems to
be correct.


-Matthias


On 12/04/2015 08:55 PM, Madhire, Naveen wrote:

> Hi Max,
>
> I forgot to include flink-storm-examples dependency in the application to
> use BoltFileSink.
>
> However, the file created by the BoltFileSink is empty. Is there any other
> stuff which I need to do to write it into a file by using BoltFileSink?
>
> I am using the same code what you mentioned,
>
> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
> OutputFormatter() {
>    @Override
>    public String format(Tuple tuple) {
>       return tuple.toString();
>    }
> }), 1).shuffleGrouping("count");
>
>
>
>
> Thanks,
> Naveen
>
>
>
>
>>
>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote:
>>
>>> Hi Naveen,
>>>
>>> Were you using Maven before? The syncing of changes in the master
>>> always takes a while for Maven. The documentation happened to be
>>> updated before Maven synchronized. Building and installing manually
>>> (what you did) solves the problem.
>>>
>>> Strangely, when I run your code on my machine with the latest
>>> 1.0-SNAPSHOT I see a lot of output on my console.
>>>
>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>>>
>>> Could you add bolt which writes the Storm tuples to a file? Is that
>>> file also empty?
>>>
>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>>> OutputFormatter() {
>>>   @Override
>>>   public String format(Tuple tuple) {
>>>      return tuple.toString();
>>>   }
>>> }), 1).shuffleGrouping("count");
>>>
>>>
>>> Thanks,
>>> Max
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The
>> information transmitted herewith is intended only for use by the
>> individual or entity to which it is addressed. If the reader of this
>> message is not the intended recipient, you are hereby notified that any
>> review, retransmission, dissemination, distribution, copying or other use
>> of, or taking of any action in reliance upon this information is strictly
>> prohibited. If you have received this communication in error, please
>> contact the sender and delete the material from your computer.
>>
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Madhire, Naveen
Hi Matthias, Sorry for the confusion. I just used a simple code in the
Count Bolt to write the bolt output into a file and was not using
BiltFileSink.

OutputStream o;
try {
    o = new FileOutputStream("/tmp/wordcount.txt", true);
    o.write((word + " " + count.toString() + "\n").getBytes());
    o.close();
} catch (IOException e) {
    e.printStackTrace();
}




Coming to BoltFileSink, I tried using cluster.shutdown at the end which
stops the local cluster but getting the below exception,

java.lang.Exception: TaskManager is shutting down.
        at
org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
:216)
        at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
        at
org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
.scala:119)
        at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210)
        at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
        at akka.actor.ActorCell.terminate(ActorCell.scala:369)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107)



I added the below lines of code for stoping the local cluster at the end,
the code is same as flink-storm-examples one.

Utils.sleep(10 * 1000);

cluster.shutdown();




Thanks,
Naveen




On 12/5/15, 7:54 AM, "Matthias J. Sax" <[hidden email]> wrote:

>Hi Naveen,
>
>in you previous mail you mention that
>
>> Yeah, I did route the ³count² bolt output to a file and I see the
>>output.
>> I can see the Storm and Flink output matching.
>
>How did you do this? Modifying the "count bolt" code? Or did you use
>some other bolt that consumes the "count bolt" output?
>
>One more thought: how much data do you have and did you terminate you
>program before looking into the result file? I am asking because
>BoltFileSink uses a BufferedOutputWriter internally -- if you have only
>a few records in your result and do not terminate, the data might still
>be buffered. I would get flushed to disc if you terminate the program.
>
>Otherwise, I could not spot any issue with your code. And as Max
>mentioned that the console output worked for him using you program I am
>little puzzled what might go wrong in your setup. The program seems to
>be correct.
>
>
>-Matthias
>
>
>On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>> Hi Max,
>>
>> I forgot to include flink-storm-examples dependency in the application
>>to
>> use BoltFileSink.
>>
>> However, the file created by the BoltFileSink is empty. Is there any
>>other
>> stuff which I need to do to write it into a file by using BoltFileSink?
>>
>> I am using the same code what you mentioned,
>>
>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>> OutputFormatter() {
>>    @Override
>>    public String format(Tuple tuple) {
>>       return tuple.toString();
>>    }
>> }), 1).shuffleGrouping("count");
>>
>>
>>
>>
>> Thanks,
>> Naveen
>>
>>
>>
>>
>>>
>>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote:
>>>
>>>> Hi Naveen,
>>>>
>>>> Were you using Maven before? The syncing of changes in the master
>>>> always takes a while for Maven. The documentation happened to be
>>>> updated before Maven synchronized. Building and installing manually
>>>> (what you did) solves the problem.
>>>>
>>>> Strangely, when I run your code on my machine with the latest
>>>> 1.0-SNAPSHOT I see a lot of output on my console.
>>>>
>>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>>>>
>>>> Could you add bolt which writes the Storm tuples to a file? Is that
>>>> file also empty?
>>>>
>>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>>>> OutputFormatter() {
>>>>   @Override
>>>>   public String format(Tuple tuple) {
>>>>      return tuple.toString();
>>>>   }
>>>> }), 1).shuffleGrouping("count");
>>>>
>>>>
>>>> Thanks,
>>>> Max
>>>
>>> ________________________________________________________
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The
>>> information transmitted herewith is intended only for use by the
>>> individual or entity to which it is addressed. If the reader of this
>>> message is not the intended recipient, you are hereby notified that any
>>> review, retransmission, dissemination, distribution, copying or other
>>>use
>>> of, or taking of any action in reliance upon this information is
>>>strictly
>>> prohibited. If you have received this communication in error, please
>>> contact the sender and delete the material from your computer.
>>>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>>proprietary to Capital One and/or its affiliates and may only be used
>>solely in performance of work or services for Capital One. The
>>information transmitted herewith is intended only for use by the
>>individual or entity to which it is addressed. If the reader of this
>>message is not the intended recipient, you are hereby notified that any
>>review, retransmission, dissemination, distribution, copying or other
>>use of, or taking of any action in reliance upon this information is
>>strictly prohibited. If you have received this communication in error,
>>please contact the sender and delete the material from your computer.
>>
>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Maximilian Michels
Hi Naveen,

In your code on GitHub, please remove the following from the WordCount file:

OutputStream o;
try {
o = new FileOutputStream("/tmp/wordcount1.txt", true);
o.write((word + " " + count.toString() + "\n").getBytes());
o.close();
} catch (IOException e) {
e.printStackTrace();
}

It is not necessary because you already have a bolt which prints to a file. What this code did, is overwriting the wordcount1.txt file on every incoming tuple.

You were not seeing console output because you didn't set up a log4j.properties file. Put the following in a file called log4j.properties in a folder "resources" under src/main/resources:

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

Then you will also see console output. We will fix the submission code of Storm such that this won't be necessary in the future. By the way, the recommended template for Flink Jobs on Storm is to start off with the Flink Quickstart project: https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html This would already contain the log4.properties file.

Best,
Max


On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen <[hidden email]> wrote:
Hi Matthias, Sorry for the confusion. I just used a simple code in the
Count Bolt to write the bolt output into a file and was not using
BiltFileSink.

OutputStream o;
try {
    o = new FileOutputStream("/tmp/wordcount.txt", true);
    o.write((word + " " + count.toString() + "\n").getBytes());
    o.close();
} catch (IOException e) {
    e.printStackTrace();
}




Coming to BoltFileSink, I tried using cluster.shutdown at the end which
stops the local cluster but getting the below exception,

java.lang.Exception: TaskManager is shutting down.
        at
org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
:216)
        at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
        at
org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
.scala:119)
        at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210)
        at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
        at akka.actor.ActorCell.terminate(ActorCell.scala:369)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107)



I added the below lines of code for stoping the local cluster at the end,
the code is same as flink-storm-examples one.

Utils.sleep(10 * 1000);

cluster.shutdown();




Thanks,
Naveen




On 12/5/15, 7:54 AM, "Matthias J. Sax" <[hidden email]> wrote:

>Hi Naveen,
>
>in you previous mail you mention that
>
>> Yeah, I did route the ³count² bolt output to a file and I see the
>>output.
>> I can see the Storm and Flink output matching.
>
>How did you do this? Modifying the "count bolt" code? Or did you use
>some other bolt that consumes the "count bolt" output?
>
>One more thought: how much data do you have and did you terminate you
>program before looking into the result file? I am asking because
>BoltFileSink uses a BufferedOutputWriter internally -- if you have only
>a few records in your result and do not terminate, the data might still
>be buffered. I would get flushed to disc if you terminate the program.
>
>Otherwise, I could not spot any issue with your code. And as Max
>mentioned that the console output worked for him using you program I am
>little puzzled what might go wrong in your setup. The program seems to
>be correct.
>
>
>-Matthias
>
>
>On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>> Hi Max,
>>
>> I forgot to include flink-storm-examples dependency in the application
>>to
>> use BoltFileSink.
>>
>> However, the file created by the BoltFileSink is empty. Is there any
>>other
>> stuff which I need to do to write it into a file by using BoltFileSink?
>>
>> I am using the same code what you mentioned,
>>
>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>> OutputFormatter() {
>>    @Override
>>    public String format(Tuple tuple) {
>>       return tuple.toString();
>>    }
>> }), 1).shuffleGrouping("count");
>>
>>
>>
>>
>> Thanks,
>> Naveen
>>
>>
>>
>>
>>>
>>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote:
>>>
>>>> Hi Naveen,
>>>>
>>>> Were you using Maven before? The syncing of changes in the master
>>>> always takes a while for Maven. The documentation happened to be
>>>> updated before Maven synchronized. Building and installing manually
>>>> (what you did) solves the problem.
>>>>
>>>> Strangely, when I run your code on my machine with the latest
>>>> 1.0-SNAPSHOT I see a lot of output on my console.
>>>>
>>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>>>>
>>>> Could you add bolt which writes the Storm tuples to a file? Is that
>>>> file also empty?
>>>>
>>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>>>> OutputFormatter() {
>>>>   @Override
>>>>   public String format(Tuple tuple) {
>>>>      return tuple.toString();
>>>>   }
>>>> }), 1).shuffleGrouping("count");
>>>>
>>>>
>>>> Thanks,
>>>> Max
>>>
>>> ________________________________________________________
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The
>>> information transmitted herewith is intended only for use by the
>>> individual or entity to which it is addressed. If the reader of this
>>> message is not the intended recipient, you are hereby notified that any
>>> review, retransmission, dissemination, distribution, copying or other
>>>use
>>> of, or taking of any action in reliance upon this information is
>>>strictly
>>> prohibited. If you have received this communication in error, please
>>> contact the sender and delete the material from your computer.
>>>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>>proprietary to Capital One and/or its affiliates and may only be used
>>solely in performance of work or services for Capital One. The
>>information transmitted herewith is intended only for use by the
>>individual or entity to which it is addressed. If the reader of this
>>message is not the intended recipient, you are hereby notified that any
>>review, retransmission, dissemination, distribution, copying or other
>>use of, or taking of any action in reliance upon this information is
>>strictly prohibited. If you have received this communication in error,
>>please contact the sender and delete the material from your computer.
>>
>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Maximilian Michels
Hi Naveen,

Turns out I had changed the pom.xml after I checked out your code while trying to get your example working. I have found the real issue of your problem. Please make sure you have the following dependency in your pom.xml (in addition to the storm modules).

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>


The quickstart also contains this. It shouldn't be necessary but it's a workaround for a bug which we just discovered with your help. Thank you for reporting!

Best regards,
Max

On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <[hidden email]> wrote:
Hi Naveen,

In your code on GitHub, please remove the following from the WordCount file:

OutputStream o;
try {
o = new FileOutputStream("/tmp/wordcount1.txt", true);
o.write((word + " " + count.toString() + "\n").getBytes());
o.close();
} catch (IOException e) {
e.printStackTrace();
}

It is not necessary because you already have a bolt which prints to a file. What this code did, is overwriting the wordcount1.txt file on every incoming tuple.

You were not seeing console output because you didn't set up a log4j.properties file. Put the following in a file called log4j.properties in a folder "resources" under src/main/resources:

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

Then you will also see console output. We will fix the submission code of Storm such that this won't be necessary in the future. By the way, the recommended template for Flink Jobs on Storm is to start off with the Flink Quickstart project: https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html This would already contain the log4.properties file.

Best,
Max


On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen <[hidden email]> wrote:
Hi Matthias, Sorry for the confusion. I just used a simple code in the
Count Bolt to write the bolt output into a file and was not using
BiltFileSink.

OutputStream o;
try {
    o = new FileOutputStream("/tmp/wordcount.txt", true);
    o.write((word + " " + count.toString() + "\n").getBytes());
    o.close();
} catch (IOException e) {
    e.printStackTrace();
}




Coming to BoltFileSink, I tried using cluster.shutdown at the end which
stops the local cluster but getting the below exception,

java.lang.Exception: TaskManager is shutting down.
        at
org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
:216)
        at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
        at
org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
.scala:119)
        at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210)
        at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
        at akka.actor.ActorCell.terminate(ActorCell.scala:369)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107)



I added the below lines of code for stoping the local cluster at the end,
the code is same as flink-storm-examples one.

Utils.sleep(10 * 1000);

cluster.shutdown();




Thanks,
Naveen




On 12/5/15, 7:54 AM, "Matthias J. Sax" <[hidden email]> wrote:

>Hi Naveen,
>
>in you previous mail you mention that
>
>> Yeah, I did route the ³count² bolt output to a file and I see the
>>output.
>> I can see the Storm and Flink output matching.
>
>How did you do this? Modifying the "count bolt" code? Or did you use
>some other bolt that consumes the "count bolt" output?
>
>One more thought: how much data do you have and did you terminate you
>program before looking into the result file? I am asking because
>BoltFileSink uses a BufferedOutputWriter internally -- if you have only
>a few records in your result and do not terminate, the data might still
>be buffered. I would get flushed to disc if you terminate the program.
>
>Otherwise, I could not spot any issue with your code. And as Max
>mentioned that the console output worked for him using you program I am
>little puzzled what might go wrong in your setup. The program seems to
>be correct.
>
>
>-Matthias
>
>
>On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>> Hi Max,
>>
>> I forgot to include flink-storm-examples dependency in the application
>>to
>> use BoltFileSink.
>>
>> However, the file created by the BoltFileSink is empty. Is there any
>>other
>> stuff which I need to do to write it into a file by using BoltFileSink?
>>
>> I am using the same code what you mentioned,
>>
>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>> OutputFormatter() {
>>    @Override
>>    public String format(Tuple tuple) {
>>       return tuple.toString();
>>    }
>> }), 1).shuffleGrouping("count");
>>
>>
>>
>>
>> Thanks,
>> Naveen
>>
>>
>>
>>
>>>
>>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]> wrote:
>>>
>>>> Hi Naveen,
>>>>
>>>> Were you using Maven before? The syncing of changes in the master
>>>> always takes a while for Maven. The documentation happened to be
>>>> updated before Maven synchronized. Building and installing manually
>>>> (what you did) solves the problem.
>>>>
>>>> Strangely, when I run your code on my machine with the latest
>>>> 1.0-SNAPSHOT I see a lot of output on my console.
>>>>
>>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>>>>
>>>> Could you add bolt which writes the Storm tuples to a file? Is that
>>>> file also empty?
>>>>
>>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>>>> OutputFormatter() {
>>>>   @Override
>>>>   public String format(Tuple tuple) {
>>>>      return tuple.toString();
>>>>   }
>>>> }), 1).shuffleGrouping("count");
>>>>
>>>>
>>>> Thanks,
>>>> Max
>>>
>>> ________________________________________________________
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The
>>> information transmitted herewith is intended only for use by the
>>> individual or entity to which it is addressed. If the reader of this
>>> message is not the intended recipient, you are hereby notified that any
>>> review, retransmission, dissemination, distribution, copying or other
>>>use
>>> of, or taking of any action in reliance upon this information is
>>>strictly
>>> prohibited. If you have received this communication in error, please
>>> contact the sender and delete the material from your computer.
>>>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>>proprietary to Capital One and/or its affiliates and may only be used
>>solely in performance of work or services for Capital One. The
>>information transmitted herewith is intended only for use by the
>>individual or entity to which it is addressed. If the reader of this
>>message is not the intended recipient, you are hereby notified that any
>>review, retransmission, dissemination, distribution, copying or other
>>use of, or taking of any action in reliance upon this information is
>>strictly prohibited. If you have received this communication in error,
>>please contact the sender and delete the material from your computer.
>>
>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Reply | Threaded
Open this post in threaded view
|

Re: Flink Storm

Matthias J. Sax-2
Hi Naveen,

just for completeness: Max fixed this bug today and we also updated the
documentation.

As you are using SNAPSHOT version, you do not need to include
"flink-java" any more if you update to the latest version containing the
fix.

Furthermore, *do not* include "storm-core" as an dependency -- this will
result in a Kryo problem due to a Flink/Storm Kryo version conflict.

(The dependency is not needed anyway, as you get it automatically via
"flink-storm-examples" or "flink-storm".)

This Kryo version conflict was the problem in the first place. It
resulted in a Kryo exception when running your program longer that 10
seconds. As you stopped after 10 seconds, you did not see the exception
and just an empty result file :/

-Matthias


On 12/08/2015 05:22 PM, Maximilian Michels wrote:

> Hi Naveen,
>
> Turns out I had changed the pom.xml after I checked out your code while
> trying to get your example working. I have found the real issue of your
> problem. Please make sure you have the following dependency in your
> pom.xml (in addition to the storm modules).
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>1.0-SNAPSHOT</version>
> </dependency>
>
> The quickstart also contains this. It shouldn't be necessary but it's a
> workaround for a bug which we just discovered with your help. Thank you
> for reporting!
>
> Best regards,
> Max
>
> On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Naveen,
>
>     In your code on GitHub, please remove the following from the
>     WordCount file:
>
>
>     OutputStream o;
>     try {
>         o = new FileOutputStream("/tmp/wordcount1.txt", true);
>     o.write((word + " " + count.toString() + "\n").getBytes());
>     o.close();
>     } catch (IOException e) {
>     e.printStackTrace();
>     }
>
>
>     It is not necessary because you already have a bolt which prints to
>     a file. What this code did, is overwriting the wordcount1.txt file
>     on every incoming tuple.
>
>     You were not seeing console output because you didn't set up a
>     log4j.properties file. Put the following in a file called
>     log4j.properties in a folder "resources" under src/main/resources:
>
>     log4j.rootLogger=INFO, console
>
>     log4j.appender.console=org.apache.log4j.ConsoleAppender
>     log4j.appender.console.layout=org.apache.log4j.PatternLayout
>     log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>
>     Then you will also see console output. We will fix the submission
>     code of Storm such that this won't be necessary in the future. By
>     the way, the recommended template for Flink Jobs on Storm is to
>     start off with the Flink Quickstart project:
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html
>     This would already contain the log4.properties file.
>
>     Best,
>     Max
>
>
>     On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen
>     <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Matthias, Sorry for the confusion. I just used a simple code
>         in the
>         Count Bolt to write the bolt output into a file and was not using
>         BiltFileSink.
>
>         OutputStream o;
>         try {
>             o = new FileOutputStream("/tmp/wordcount.txt", true);
>             o.write((word + " " + count.toString() + "\n").getBytes());
>             o.close();
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
>
>
>
>         Coming to BoltFileSink, I tried using cluster.shutdown at the
>         end which
>         stops the local cluster but getting the below exception,
>
>         java.lang.Exception: TaskManager is shutting down.
>                 at
>         org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
>         :216)
>                 at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>                 at
>         org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
>         .scala:119)
>                 at
>         akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
>         nishTerminate(FaultHandling.scala:210)
>                 at
>         akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>                 at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>                 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>                 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>                 at
>         akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>                 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>                 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>                 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>                 at
>         scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>                 at
>         scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
>         1339)
>                 at
>         scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>                 at
>         scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
>         a:107)
>
>
>
>         I added the below lines of code for stoping the local cluster at
>         the end,
>         the code is same as flink-storm-examples one.
>
>         Utils.sleep(10 * 1000);
>
>         cluster.shutdown();
>
>
>
>
>         Thanks,
>         Naveen
>
>
>
>
>         On 12/5/15, 7:54 AM, "Matthias J. Sax" <[hidden email]
>         <mailto:[hidden email]>> wrote:
>
>         >Hi Naveen,
>         >
>         >in you previous mail you mention that
>         >
>         >> Yeah, I did route the ³count² bolt output to a file and I see the
>         >>output.
>         >> I can see the Storm and Flink output matching.
>         >
>         >How did you do this? Modifying the "count bolt" code? Or did
>         you use
>         >some other bolt that consumes the "count bolt" output?
>         >
>         >One more thought: how much data do you have and did you
>         terminate you
>         >program before looking into the result file? I am asking because
>         >BoltFileSink uses a BufferedOutputWriter internally -- if you
>         have only
>         >a few records in your result and do not terminate, the data
>         might still
>         >be buffered. I would get flushed to disc if you terminate the
>         program.
>         >
>         >Otherwise, I could not spot any issue with your code. And as Max
>         >mentioned that the console output worked for him using you
>         program I am
>         >little puzzled what might go wrong in your setup. The program
>         seems to
>         >be correct.
>         >
>         >
>         >-Matthias
>         >
>         >
>         >On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>         >> Hi Max,
>         >>
>         >> I forgot to include flink-storm-examples dependency in the
>         application
>         >>to
>         >> use BoltFileSink.
>         >>
>         >> However, the file created by the BoltFileSink is empty. Is
>         there any
>         >>other
>         >> stuff which I need to do to write it into a file by using
>         BoltFileSink?
>         >>
>         >> I am using the same code what you mentioned,
>         >>
>         >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>         >> OutputFormatter() {
>         >>    @Override
>         >>    public String format(Tuple tuple) {
>         >>       return tuple.toString();
>         >>    }
>         >> }), 1).shuffleGrouping("count");
>         >>
>         >>
>         >>
>         >>
>         >> Thanks,
>         >> Naveen
>         >>
>         >>
>         >>
>         >>
>         >>>
>         >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <[hidden email]
>         <mailto:[hidden email]>> wrote:
>         >>>
>         >>>> Hi Naveen,
>         >>>>
>         >>>> Were you using Maven before? The syncing of changes in the
>         master
>         >>>> always takes a while for Maven. The documentation happened
>         to be
>         >>>> updated before Maven synchronized. Building and installing
>         manually
>         >>>> (what you did) solves the problem.
>         >>>>
>         >>>> Strangely, when I run your code on my machine with the latest
>         >>>> 1.0-SNAPSHOT I see a lot of output on my console.
>         >>>>
>         >>>> Here's the output:
>         https://gist.github.com/mxm/98cd927866b193ce0f89
>         >>>>
>         >>>> Could you add bolt which writes the Storm tuples to a file?
>         Is that
>         >>>> file also empty?
>         >>>>
>         >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>         >>>> OutputFormatter() {
>         >>>>   @Override
>         >>>>   public String format(Tuple tuple) {
>         >>>>      return tuple.toString();
>         >>>>   }
>         >>>> }), 1).shuffleGrouping("count");
>         >>>>
>         >>>>
>         >>>> Thanks,
>         >>>> Max
>         >>>
>         >>> ________________________________________________________
>         >>>
>         >>> The information contained in this e-mail is confidential and/or
>         >>> proprietary to Capital One and/or its affiliates and may
>         only be used
>         >>> solely in performance of work or services for Capital One. The
>         >>> information transmitted herewith is intended only for use by the
>         >>> individual or entity to which it is addressed. If the reader
>         of this
>         >>> message is not the intended recipient, you are hereby
>         notified that any
>         >>> review, retransmission, dissemination, distribution, copying
>         or other
>         >>>use
>         >>> of, or taking of any action in reliance upon this information is
>         >>>strictly
>         >>> prohibited. If you have received this communication in
>         error, please
>         >>> contact the sender and delete the material from your computer.
>         >>>
>         >>
>         >> ________________________________________________________
>         >>
>         >> The information contained in this e-mail is confidential and/or
>         >>proprietary to Capital One and/or its affiliates and may only
>         be used
>         >>solely in performance of work or services for Capital One. The
>         >>information transmitted herewith is intended only for use by the
>         >>individual or entity to which it is addressed. If the reader
>         of this
>         >>message is not the intended recipient, you are hereby notified
>         that any
>         >>review, retransmission, dissemination, distribution, copying
>         or other
>         >>use of, or taking of any action in reliance upon this
>         information is
>         >>strictly prohibited. If you have received this communication
>         in error,
>         >>please contact the sender and delete the material from your
>         computer.
>         >>
>         >
>
>         ________________________________________________________
>
>         The information contained in this e-mail is confidential and/or
>         proprietary to Capital One and/or its affiliates and may only be
>         used solely in performance of work or services for Capital One.
>         The information transmitted herewith is intended only for use by
>         the individual or entity to which it is addressed. If the reader
>         of this message is not the intended recipient, you are hereby
>         notified that any review, retransmission, dissemination,
>         distribution, copying or other use of, or taking of any action
>         in reliance upon this information is strictly prohibited. If you
>         have received this communication in error, please contact the
>         sender and delete the material from your computer.
>
>
>


signature.asc (836 bytes) Download Attachment