Working with storm compatibility layer

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Working with storm compatibility layer

Shinhyung Yang
Howdies to everyone,

I'm trying to use the storm compatibility layer on Flink 0.10.1. The
original storm topology works fine on Storm 0.9.5 and I have
incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
FlinkTopology classes according to the programming guide
(https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
What happens is, it seems to be going all the way to submitTopology
method without any problem, however it doesn't invoke open method of
Spout class but declareOutputFields method is called for multiple
times and the program exits silently. Do you guys have any idea what's
going on here or have any suggestions? If needed, then please ask me
for more information.

Thank you for reading.
With best regards,
Shinhyung Yang
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Matthias J. Sax-2
Hello Shinhyung,

that sounds weird and should not happen -- Spout.open() should get
called exactly once. I am not sure about multiple calls to
declareOuputFields though -- if might be called multiple times -- would
need to double check the code.

However, the call to declareOuputFields should be idempotent, so it
should actually not be a problem if it is called multiple times. Even if
Storm might call this method only once, there is no guarantee that it is
not called multiple time. If this is a problem for you, please let me
know. I think we could fix this and make sure the method is only called
once.

It would be helpful if you could share you code. What do you mean with
"exits silently"? No submission happens? Did you check the logs? As you
mentioned FlinkLocalCluster, I assume that you run within an IDE?

Btw: lately we fixed a couple of bugs. I would suggest that you use the
latest version from Flink master branch. I should work with 0.10.1
without problems.

-Matthias



On 01/09/2016 01:27 AM, Shinhyung Yang wrote:

> Howdies to everyone,
>
> I'm trying to use the storm compatibility layer on Flink 0.10.1. The
> original storm topology works fine on Storm 0.9.5 and I have
> incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
> FlinkTopology classes according to the programming guide
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
> I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
> What happens is, it seems to be going all the way to submitTopology
> method without any problem, however it doesn't invoke open method of
> Spout class but declareOutputFields method is called for multiple
> times and the program exits silently. Do you guys have any idea what's
> going on here or have any suggestions? If needed, then please ask me
> for more information.
>
> Thank you for reading.
> With best regards,
> Shinhyung Yang
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Shinhyung Yang
Dear Matthias,

Thank you for replying!

that sounds weird and should not happen -- Spout.open() should get
called exactly once.

That's what I thought too. I'm new to both Storm and Flink so it's quite complicated for me to handle both yet; would it be helpful for me if I know storm's lifecyle and flink 's lifecycle? When submitTopology() invoked, what should be called other than spout.open()?

I am not sure about multiple calls to
declareOuputFields though -- if might be called multiple times -- would
need to double check the code.

I'll check my code too.
 
However, the call to declareOuputFields should be idempotent, so it
should actually not be a problem if it is called multiple times. Even if
Storm might call this method only once, there is no guarantee that it is
not called multiple time. If this is a problem for you, please let me
know. I think we could fix this and make sure the method is only called
once.

Actually it doesn't seem to be a problem for now. It just does the same job multiple times.
 
It would be helpful if you could share you code. What do you mean with
"exits silently"? No submission happens? Did you check the logs? As you
mentioned FlinkLocalCluster, I assume that you run within an IDE?

The topology doesn't seem to continue. There's a set of initialization code in the open method of the program's spout and it looks hopeless if it's not invoked. Is there any way to check the logs other than using println() calls? I'm running it on the commandline with having `bin/start_local.sh' running in the background and `bin/flink run'.
 
Btw: lately we fixed a couple of bugs. I would suggest that you use the
latest version from Flink master branch. I should work with 0.10.1
without problems.

It was vey tedious for me to deal with a pom.xml file and .m2 repository. So I preferred to use maven central. But I should try with the master branch if I have to.

I will quickly check if I could share some of the code.

Thank you again for the help!
With best regards,
Shinhyung Yang
 

-Matthias



On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
> Howdies to everyone,
>
> I'm trying to use the storm compatibility layer on Flink 0.10.1. The
> original storm topology works fine on Storm 0.9.5 and I have
> incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
> FlinkTopology classes according to the programming guide
> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
> I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
> What happens is, it seems to be going all the way to submitTopology
> method without any problem, however it doesn't invoke open method of
> Spout class but declareOutputFields method is called for multiple
> times and the program exits silently. Do you guys have any idea what's
> going on here or have any suggestions? If needed, then please ask me
> for more information.
>
> Thank you for reading.
> With best regards,
> Shinhyung Yang
>

Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Matthias J. Sax-2
Hi,

I just double checked the Flink code and during translation from Storm
to Flink declareOuputFields() is called twice. You are right that is
does the same job twice, but that is actually not a problem. The Flink
code is cleaner this way to I guess we will not change it.

About lifecyle:
If you submit your code, during deployment, Spout.open() and
Bolt.prepare() should be called for each parallel instance on each
Spout/Bolt of your topology.

About your submission (I guess this should solve your current problem):
If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
but FlinkSubmitter. You have to distinguish three cases:

  - local/debug/IDE mode: use FlinkLocalCluster
    => you do not need to start any Flink cluster before --
FlinkLocalCluster is started up in you current JVM
    * the purpose is local debugging in an IDE (this allows to easily
set break points and debug code)

  - pseudo-distributed mode: use FlinkSubmitter
    => you start up a local Flink cluster via bin/start-local.sh
    * this local Flink cluster run in an own JVM and looks like a real
cluster to the Flink client, ie, "bin/flink run"
    * thus, you just use FlinkSubmitter as for a real cluster (with
JobManager/Nimbus hostname "localhost")
    * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
started in your current JVM, but your code is shipped to the local
cluster you started up beforehand via bin/start-local.sh and executed in
this JVM

  - distributed mode: use FlinkSubmitter
    => you start up Flink in a real cluster using bin/start-cluster.sh
    * you use "bin/flink run" to submit your code to the real cluster


About further debugging: you can increase the log level to get more
information:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html

Hope this helps!

-Matthias

On 01/09/2016 04:38 PM, Shinhyung Yang wrote:

> Dear Matthias,
>
> Thank you for replying!
>
>     that sounds weird and should not happen -- Spout.open() should get
>     called exactly once.
>
>
> That's what I thought too. I'm new to both Storm and Flink so it's quite
> complicated for me to handle both yet; would it be helpful for me if I
> know storm's lifecyle and flink 's lifecycle? When submitTopology()
> invoked, what should be called other than spout.open()?
>
>     I am not sure about multiple calls to
>
>     declareOuputFields though -- if might be called multiple times -- would
>     need to double check the code.
>
>
> I'll check my code too.
>  
>
>     However, the call to declareOuputFields should be idempotent, so it
>     should actually not be a problem if it is called multiple times. Even if
>     Storm might call this method only once, there is no guarantee that it is
>     not called multiple time. If this is a problem for you, please let me
>     know. I think we could fix this and make sure the method is only called
>     once.
>
>
> Actually it doesn't seem to be a problem for now. It just does the same
> job multiple times.
>  
>
>     It would be helpful if you could share you code. What do you mean with
>     "exits silently"? No submission happens? Did you check the logs? As you
>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>
>
> The topology doesn't seem to continue. There's a set of initialization
> code in the open method of the program's spout and it looks hopeless if
> it's not invoked. Is there any way to check the logs other than using
> println() calls? I'm running it on the commandline with having
> `bin/start_local.sh' running in the background and `bin/flink run'.
>  
>
>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>     latest version from Flink master branch. I should work with 0.10.1
>     without problems.
>
>
> It was vey tedious for me to deal with a pom.xml file and .m2
> repository. So I preferred to use maven central. But I should try with
> the master branch if I have to.
>
> I will quickly check if I could share some of the code.
>
> Thank you again for the help!
> With best regards,
> Shinhyung Yang
>  
>
>
>     -Matthias
>
>
>
>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>     > Howdies to everyone,
>     >
>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>     > original storm topology works fine on Storm 0.9.5 and I have
>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>     > FlinkTopology classes according to the programming guide
>     >
>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>     > What happens is, it seems to be going all the way to submitTopology
>     > method without any problem, however it doesn't invoke open method of
>     > Spout class but declareOutputFields method is called for multiple
>     > times and the program exits silently. Do you guys have any idea what's
>     > going on here or have any suggestions? If needed, then please ask me
>     > for more information.
>     >
>     > Thank you for reading.
>     > With best regards,
>     > Shinhyung Yang
>     >
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Shinhyung Yang
Dear Matthias,

Thank you for the reply! I am so sorry to respond late on the matter.

> I just double checked the Flink code and during translation from Storm
> to Flink declareOuputFields() is called twice. You are right that is
> does the same job twice, but that is actually not a problem. The Flink
> code is cleaner this way to I guess we will not change it.

Thank you for checking. I don't think it contributed any part of my
current problem anyways. For my case though, it is called 3 times if
the number is important at all.

> About lifecyle:
> If you submit your code, during deployment, Spout.open() and
> Bolt.prepare() should be called for each parallel instance on each
> Spout/Bolt of your topology.
>
> About your submission (I guess this should solve your current problem):
> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
> but FlinkSubmitter. You have to distinguish three cases:
>
>   - local/debug/IDE mode: use FlinkLocalCluster
>     => you do not need to start any Flink cluster before --
> FlinkLocalCluster is started up in you current JVM
>     * the purpose is local debugging in an IDE (this allows to easily
> set break points and debug code)
>
>   - pseudo-distributed mode: use FlinkSubmitter
>     => you start up a local Flink cluster via bin/start-local.sh
>     * this local Flink cluster run in an own JVM and looks like a real
> cluster to the Flink client, ie, "bin/flink run"
>     * thus, you just use FlinkSubmitter as for a real cluster (with
> JobManager/Nimbus hostname "localhost")
>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
> started in your current JVM, but your code is shipped to the local
> cluster you started up beforehand via bin/start-local.sh and executed in
> this JVM
>
>   - distributed mode: use FlinkSubmitter
>     => you start up Flink in a real cluster using bin/start-cluster.sh
>     * you use "bin/flink run" to submit your code to the real cluster

Thank you for the explanation, now I have clearer understanding of
clusters and submitters. However my problem is not fixed yet. Here's
my code:

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/App.java
////////////////////////////////////////////////////////////////////////////////

package myexample;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import myexample.spout.StandaloneSpout;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;

import myexample.bolt.Node;
import myexample.bolt.StandardBolt;

import java.util.Arrays;
import java.util.List;

//
import org.apache.flink.storm.api.FlinkTopology;
//import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkSubmitter;
//import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkTopologyBuilder;

public class App
{
    public static void main( String[] args ) throws Exception
    {
        int layer = 0;
        StandaloneSpout spout = new StandaloneSpout();
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_DEBUG, false);
        //FlinkLocalCluster cluster = new FlinkLocalCluster();
        //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        //LocalCluster cluster = new LocalCluster();

        layer = Integer.parseInt(args[0]);
        //cluster.submitTopology("topology", conf,
BinaryTopology(spout, layer));
        FlinkSubmitter.submitTopology("topology", conf,
BinaryTopology(spout, layer));
        //Thread.sleep(5 * 1000);
        //FlinkClient.getConfiguredClient(conf).killTopology("topology");
        //cluster.killTopology("topology");
        //cluster.shutdown();
    }

    public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
    //public static StormTopology BinaryTopology(IRichSpout input, int n) {
        return BinaryTopology(input, n,
Arrays.asList((BaseBasicBolt)new StandardBolt()));
    }

    public static FlinkTopology BinaryTopology(IRichSpout input, int
n, List<BaseBasicBolt> boltList) {
    //public static StormTopology BinaryTopology(IRichSpout input, int
n, List<BaseBasicBolt> boltList) {
        FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
        //TopologyBuilder builder = new TopologyBuilder();
        String sourceId = "src";
        builder.setSpout(sourceId, input);


        String boltId = "bolt";
        builder.setBolt(boltId, new Node(), Math.pow(2,
n)).shuffleGrouping(sourceId);

        return builder.createTopology();
    }
}

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/spout/StandaloneSpout.java
////////////////////////////////////////////////////////////////////////////////

package myexample.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class StandaloneSpout extends BaseRichSpout {

    private SpoutOutputCollector mCollector;

    @Override
    public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
        this.mCollector = collector;
    }

    @Override
    public void nextTuple() {
        long currentTime = System.currentTimeMillis();

        // TODO: Currently, do not check bound of list, because of
experiment.(Avoid branch)
        mCollector.emit(new Values(new String("aaa"),
System.currentTimeMillis(), 0));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("declareOutputFields");
        declarer.declare(new Fields("string1", "timestamp", "omitted"));
    }
}

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/bolt/Node.java
////////////////////////////////////////////////////////////////////////////////

package myexample.bolt;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class Node extends BaseBasicBolt {

    public static boolean isTupleEmpty(Tuple tuple) {
      return false;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }

    @Override
    public void cleanup() {
        super.cleanup();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("string1", "string2", "timestamp",
"omitted"));
    }
}

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/bolt/StandardBolt.java
////////////////////////////////////////////////////////////////////////////////

package myexample.bolt;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class StandardBolt extends BaseBasicBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }
}

Probably it is the source code which has the problem or other things
around the project environment might contain the problem. I would
really appreciate if you could verify whether the code looks ok or
not.

>
> About further debugging: you can increase the log level to get more
> information:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html

I tried to inject `log4j.properties' file that I got from a sample
flink-quickstart-java application created from `mvn
archetype:generate' to a ./target/*.jar but it does not work. I tried
this because placing that `log4j.properties' file under
./src/main/resources of my project did not work in the first place.

Thank you again for your help.
With best regards,
Shinhyung

> Hope this helps!
>
> -Matthias
>
> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>> Dear Matthias,
>>
>> Thank you for replying!
>>
>>     that sounds weird and should not happen -- Spout.open() should get
>>     called exactly once.
>>
>>
>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>> complicated for me to handle both yet; would it be helpful for me if I
>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>> invoked, what should be called other than spout.open()?
>>
>>     I am not sure about multiple calls to
>>
>>     declareOuputFields though -- if might be called multiple times -- would
>>     need to double check the code.
>>
>>
>> I'll check my code too.
>>
>>
>>     However, the call to declareOuputFields should be idempotent, so it
>>     should actually not be a problem if it is called multiple times. Even if
>>     Storm might call this method only once, there is no guarantee that it is
>>     not called multiple time. If this is a problem for you, please let me
>>     know. I think we could fix this and make sure the method is only called
>>     once.
>>
>>
>> Actually it doesn't seem to be a problem for now. It just does the same
>> job multiple times.
>>
>>
>>     It would be helpful if you could share you code. What do you mean with
>>     "exits silently"? No submission happens? Did you check the logs? As you
>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>
>>
>> The topology doesn't seem to continue. There's a set of initialization
>> code in the open method of the program's spout and it looks hopeless if
>> it's not invoked. Is there any way to check the logs other than using
>> println() calls? I'm running it on the commandline with having
>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>
>>
>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>     latest version from Flink master branch. I should work with 0.10.1
>>     without problems.
>>
>>
>> It was vey tedious for me to deal with a pom.xml file and .m2
>> repository. So I preferred to use maven central. But I should try with
>> the master branch if I have to.
>>
>> I will quickly check if I could share some of the code.
>>
>> Thank you again for the help!
>> With best regards,
>> Shinhyung Yang
>>
>>
>>
>>     -Matthias
>>
>>
>>
>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>     > Howdies to everyone,
>>     >
>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>     > original storm topology works fine on Storm 0.9.5 and I have
>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>     > FlinkTopology classes according to the programming guide
>>     >
>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>     > What happens is, it seems to be going all the way to submitTopology
>>     > method without any problem, however it doesn't invoke open method of
>>     > Spout class but declareOutputFields method is called for multiple
>>     > times and the program exits silently. Do you guys have any idea what's
>>     > going on here or have any suggestions? If needed, then please ask me
>>     > for more information.
>>     >
>>     > Thank you for reading.
>>     > With best regards,
>>     > Shinhyung Yang
>>     >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Matthias J. Sax-2
Hi,

I can submit the topology without any problems. Your code is fine.

If your program "exits silently" I would actually assume, that you
submitted the topology successfully. Can you see the topology in
JobManager WebFrontend? If not, do you see any errors in the log files?

-Matthias

On 01/14/2016 07:37 AM, Shinhyung Yang wrote:

> Dear Matthias,
>
> Thank you for the reply! I am so sorry to respond late on the matter.
>
>> I just double checked the Flink code and during translation from Storm
>> to Flink declareOuputFields() is called twice. You are right that is
>> does the same job twice, but that is actually not a problem. The Flink
>> code is cleaner this way to I guess we will not change it.
>
> Thank you for checking. I don't think it contributed any part of my
> current problem anyways. For my case though, it is called 3 times if
> the number is important at all.
>
>> About lifecyle:
>> If you submit your code, during deployment, Spout.open() and
>> Bolt.prepare() should be called for each parallel instance on each
>> Spout/Bolt of your topology.
>>
>> About your submission (I guess this should solve your current problem):
>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>> but FlinkSubmitter. You have to distinguish three cases:
>>
>>   - local/debug/IDE mode: use FlinkLocalCluster
>>     => you do not need to start any Flink cluster before --
>> FlinkLocalCluster is started up in you current JVM
>>     * the purpose is local debugging in an IDE (this allows to easily
>> set break points and debug code)
>>
>>   - pseudo-distributed mode: use FlinkSubmitter
>>     => you start up a local Flink cluster via bin/start-local.sh
>>     * this local Flink cluster run in an own JVM and looks like a real
>> cluster to the Flink client, ie, "bin/flink run"
>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>> JobManager/Nimbus hostname "localhost")
>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>> started in your current JVM, but your code is shipped to the local
>> cluster you started up beforehand via bin/start-local.sh and executed in
>> this JVM
>>
>>   - distributed mode: use FlinkSubmitter
>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>     * you use "bin/flink run" to submit your code to the real cluster
>
> Thank you for the explanation, now I have clearer understanding of
> clusters and submitters. However my problem is not fixed yet. Here's
> my code:
>
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/App.java
> ////////////////////////////////////////////////////////////////////////////////
>
> package myexample;
>
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import myexample.spout.StandaloneSpout;
> import backtype.storm.generated.StormTopology;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.TopologyBuilder;
> import backtype.storm.topology.base.BaseBasicBolt;
>
> import myexample.bolt.Node;
> import myexample.bolt.StandardBolt;
>
> import java.util.Arrays;
> import java.util.List;
>
> //
> import org.apache.flink.storm.api.FlinkTopology;
> //import org.apache.flink.storm.api.FlinkLocalCluster;
> import org.apache.flink.storm.api.FlinkSubmitter;
> //import org.apache.flink.storm.api.FlinkClient;
> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>
> public class App
> {
>     public static void main( String[] args ) throws Exception
>     {
>         int layer = 0;
>         StandaloneSpout spout = new StandaloneSpout();
>         Config conf = new Config();
>         conf.put(Config.TOPOLOGY_DEBUG, false);
>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>         //LocalCluster cluster = new LocalCluster();
>
>         layer = Integer.parseInt(args[0]);
>         //cluster.submitTopology("topology", conf,
> BinaryTopology(spout, layer));
>         FlinkSubmitter.submitTopology("topology", conf,
> BinaryTopology(spout, layer));
>         //Thread.sleep(5 * 1000);
>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>         //cluster.killTopology("topology");
>         //cluster.shutdown();
>     }
>
>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>     //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>         return BinaryTopology(input, n,
> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>     }
>
>     public static FlinkTopology BinaryTopology(IRichSpout input, int
> n, List<BaseBasicBolt> boltList) {
>     //public static StormTopology BinaryTopology(IRichSpout input, int
> n, List<BaseBasicBolt> boltList) {
>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>         //TopologyBuilder builder = new TopologyBuilder();
>         String sourceId = "src";
>         builder.setSpout(sourceId, input);
>
>
>         String boltId = "bolt";
>         builder.setBolt(boltId, new Node(), Math.pow(2,
> n)).shuffleGrouping(sourceId);
>
>         return builder.createTopology();
>     }
> }
>
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/spout/StandaloneSpout.java
> ////////////////////////////////////////////////////////////////////////////////
>
> package myexample.spout;
>
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
>
> import java.io.*;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.*;
>
> public class StandaloneSpout extends BaseRichSpout {
>
>     private SpoutOutputCollector mCollector;
>
>     @Override
>     public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>         this.mCollector = collector;
>     }
>
>     @Override
>     public void nextTuple() {
>         long currentTime = System.currentTimeMillis();
>
>         // TODO: Currently, do not check bound of list, because of
> experiment.(Avoid branch)
>         mCollector.emit(new Values(new String("aaa"),
> System.currentTimeMillis(), 0));
>
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         System.out.println("declareOutputFields");
>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>     }
> }
>
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/bolt/Node.java
> ////////////////////////////////////////////////////////////////////////////////
>
> package myexample.bolt;
>
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import java.util.Map;
>
> public class Node extends BaseBasicBolt {
>
>     public static boolean isTupleEmpty(Tuple tuple) {
>       return false;
>     }
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context) {
>         super.prepare(stormConf, context);
>     }
>
>     @Override
>     public void cleanup() {
>         super.cleanup();
>     }
>
>     @Override
>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("string1", "string2", "timestamp",
> "omitted"));
>     }
> }
>
> ////////////////////////////////////////////////////////////////////////////////
> // ./src/main/java/myexample/bolt/StandardBolt.java
> ////////////////////////////////////////////////////////////////////////////////
>
> package myexample.bolt;
>
> import java.util.Map;
>
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Tuple;
>
> public class StandardBolt extends BaseBasicBolt {
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context) {
>         super.prepare(stormConf, context);
>     }
>
>     @Override
>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>     }
> }
>
> Probably it is the source code which has the problem or other things
> around the project environment might contain the problem. I would
> really appreciate if you could verify whether the code looks ok or
> not.
>
>>
>> About further debugging: you can increase the log level to get more
>> information:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
>
> I tried to inject `log4j.properties' file that I got from a sample
> flink-quickstart-java application created from `mvn
> archetype:generate' to a ./target/*.jar but it does not work. I tried
> this because placing that `log4j.properties' file under
> ./src/main/resources of my project did not work in the first place.
>
> Thank you again for your help.
> With best regards,
> Shinhyung
>
>> Hope this helps!
>>
>> -Matthias
>>
>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>> Dear Matthias,
>>>
>>> Thank you for replying!
>>>
>>>     that sounds weird and should not happen -- Spout.open() should get
>>>     called exactly once.
>>>
>>>
>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>> complicated for me to handle both yet; would it be helpful for me if I
>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>> invoked, what should be called other than spout.open()?
>>>
>>>     I am not sure about multiple calls to
>>>
>>>     declareOuputFields though -- if might be called multiple times -- would
>>>     need to double check the code.
>>>
>>>
>>> I'll check my code too.
>>>
>>>
>>>     However, the call to declareOuputFields should be idempotent, so it
>>>     should actually not be a problem if it is called multiple times. Even if
>>>     Storm might call this method only once, there is no guarantee that it is
>>>     not called multiple time. If this is a problem for you, please let me
>>>     know. I think we could fix this and make sure the method is only called
>>>     once.
>>>
>>>
>>> Actually it doesn't seem to be a problem for now. It just does the same
>>> job multiple times.
>>>
>>>
>>>     It would be helpful if you could share you code. What do you mean with
>>>     "exits silently"? No submission happens? Did you check the logs? As you
>>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>
>>>
>>> The topology doesn't seem to continue. There's a set of initialization
>>> code in the open method of the program's spout and it looks hopeless if
>>> it's not invoked. Is there any way to check the logs other than using
>>> println() calls? I'm running it on the commandline with having
>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>
>>>
>>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>     latest version from Flink master branch. I should work with 0.10.1
>>>     without problems.
>>>
>>>
>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>> repository. So I preferred to use maven central. But I should try with
>>> the master branch if I have to.
>>>
>>> I will quickly check if I could share some of the code.
>>>
>>> Thank you again for the help!
>>> With best regards,
>>> Shinhyung Yang
>>>
>>>
>>>
>>>     -Matthias
>>>
>>>
>>>
>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>     > Howdies to everyone,
>>>     >
>>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>     > original storm topology works fine on Storm 0.9.5 and I have
>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>     > FlinkTopology classes according to the programming guide
>>>     >
>>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>     > What happens is, it seems to be going all the way to submitTopology
>>>     > method without any problem, however it doesn't invoke open method of
>>>     > Spout class but declareOutputFields method is called for multiple
>>>     > times and the program exits silently. Do you guys have any idea what's
>>>     > going on here or have any suggestions? If needed, then please ask me
>>>     > for more information.
>>>     >
>>>     > Thank you for reading.
>>>     > With best regards,
>>>     > Shinhyung Yang
>>>     >
>>>
>>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Shinhyung Yang
Dear Matthias,

Thank you for a quick reply. It failed again, however I was able to
access to its WebFrontend and it gave me some logs. I wanted to show
logs immediately before digging down into it.

19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
          - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
(topology).
19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
          - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
(topology).
19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
          - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
changed to RUNNING.
19:48:18,014 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
CREATED to SCHEDULED
19:48:18,014 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
SCHEDULED to DEPLOYING
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Deploying Source: src (1/1) (attempt #0) to localhost
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
SCHEDULED
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
DEPLOYING
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Deploying bolt (1/8) (attempt #0) to localhost
19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
          - Received task Source: src (1/1)
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
SCHEDULED
19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
          - Loading JAR files for task Source: src (1/1)
19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
          - Received task bolt (1/8)
19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
          - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
localhost/127.0.0.1:36498
19:48:18,017 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
DEPLOYING to CANCELING
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
CANCELING
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
          - Attempting to cancel task Source: src (1/1)
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
CANCELED
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
          - Source: src (1/1) switched to CANCELING
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
CANCELED
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
          - Attempting to cancel task bolt (1/8)
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
          - bolt (1/8) switched to CANCELING
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
          - Loading JAR files for task bolt (1/8)
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
CANCELED
19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
          - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
changed to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager
in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
(unassigned) - [SCHEDULED] > with groupID <
52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
[52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>. Resources available to scheduler: Number of instances=1, total
number of slots=1, available slots=0
at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
19:48:18,019 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to
CANCELED
19:48:18,020 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to
CANCELED
19:48:18,020 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to
CANCELED
19:48:18,020 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to
CANCELED
19:48:18,526 WARN  akka.remote.ReliableDeliverySupervisor
          - Association with remote system
[akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated
for [5000] ms. Reason is: [Disassociated].
19:48:18,527 WARN  akka.remote.ReliableDeliverySupervisor
          - Association with remote system
[akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated
for [5000] ms. Reason is: [Disassociated].
19:48:18,531 WARN  akka.remote.ReliableDeliverySupervisor
          - Association with remote system
[akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated
for [5000] ms. Reason is: [Disassociated].
19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
          - Source: src (1/1) switched to CANCELED
19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
          - bolt (1/8) switched to CANCELED
19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
          - Freeing task resources for Source: src (1/1)
19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
          - Freeing task resources for bolt (1/8)
19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
          - Unregistering task and sending final execution state
CANCELED to JobManager for task Source: src
(3535644576ae695d2685a65401e16fc4)
19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
          - Unregistering task and sending final execution state
CANCELED to JobManager for task bolt
(391ac2875a2fdc86d8af4f2d51e3e849)
19:48:18,770 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to
CANCELED
19:48:18,770 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
CANCELING to CANCELED
19:48:18,773 INFO  org.apache.flink.runtime.jobmanager.JobManager
          - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
changed to FAILED.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager
in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
(unassigned) - [SCHEDULED] > with groupID <
52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
[52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>. Resources available to scheduler: Number of instances=1, total
number of slots=1, available slots=0
at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It seems like my setting has some problem from a quick look at the log?

Thank you.
Best regards,
Shinhyung

On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <[hidden email]> wrote:

> Hi,
>
> I can submit the topology without any problems. Your code is fine.
>
> If your program "exits silently" I would actually assume, that you
> submitted the topology successfully. Can you see the topology in
> JobManager WebFrontend? If not, do you see any errors in the log files?
>
> -Matthias
>
> On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
>> Dear Matthias,
>>
>> Thank you for the reply! I am so sorry to respond late on the matter.
>>
>>> I just double checked the Flink code and during translation from Storm
>>> to Flink declareOuputFields() is called twice. You are right that is
>>> does the same job twice, but that is actually not a problem. The Flink
>>> code is cleaner this way to I guess we will not change it.
>>
>> Thank you for checking. I don't think it contributed any part of my
>> current problem anyways. For my case though, it is called 3 times if
>> the number is important at all.
>>
>>> About lifecyle:
>>> If you submit your code, during deployment, Spout.open() and
>>> Bolt.prepare() should be called for each parallel instance on each
>>> Spout/Bolt of your topology.
>>>
>>> About your submission (I guess this should solve your current problem):
>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>>> but FlinkSubmitter. You have to distinguish three cases:
>>>
>>>   - local/debug/IDE mode: use FlinkLocalCluster
>>>     => you do not need to start any Flink cluster before --
>>> FlinkLocalCluster is started up in you current JVM
>>>     * the purpose is local debugging in an IDE (this allows to easily
>>> set break points and debug code)
>>>
>>>   - pseudo-distributed mode: use FlinkSubmitter
>>>     => you start up a local Flink cluster via bin/start-local.sh
>>>     * this local Flink cluster run in an own JVM and looks like a real
>>> cluster to the Flink client, ie, "bin/flink run"
>>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>>> JobManager/Nimbus hostname "localhost")
>>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>>> started in your current JVM, but your code is shipped to the local
>>> cluster you started up beforehand via bin/start-local.sh and executed in
>>> this JVM
>>>
>>>   - distributed mode: use FlinkSubmitter
>>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>>     * you use "bin/flink run" to submit your code to the real cluster
>>
>> Thank you for the explanation, now I have clearer understanding of
>> clusters and submitters. However my problem is not fixed yet. Here's
>> my code:
>>
>> ////////////////////////////////////////////////////////////////////////////////
>> // ./src/main/java/myexample/App.java
>> ////////////////////////////////////////////////////////////////////////////////
>>
>> package myexample;
>>
>> import backtype.storm.Config;
>> import backtype.storm.LocalCluster;
>> import myexample.spout.StandaloneSpout;
>> import backtype.storm.generated.StormTopology;
>> import backtype.storm.topology.IRichSpout;
>> import backtype.storm.topology.TopologyBuilder;
>> import backtype.storm.topology.base.BaseBasicBolt;
>>
>> import myexample.bolt.Node;
>> import myexample.bolt.StandardBolt;
>>
>> import java.util.Arrays;
>> import java.util.List;
>>
>> //
>> import org.apache.flink.storm.api.FlinkTopology;
>> //import org.apache.flink.storm.api.FlinkLocalCluster;
>> import org.apache.flink.storm.api.FlinkSubmitter;
>> //import org.apache.flink.storm.api.FlinkClient;
>> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>>
>> public class App
>> {
>>     public static void main( String[] args ) throws Exception
>>     {
>>         int layer = 0;
>>         StandaloneSpout spout = new StandaloneSpout();
>>         Config conf = new Config();
>>         conf.put(Config.TOPOLOGY_DEBUG, false);
>>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>>         //LocalCluster cluster = new LocalCluster();
>>
>>         layer = Integer.parseInt(args[0]);
>>         //cluster.submitTopology("topology", conf,
>> BinaryTopology(spout, layer));
>>         FlinkSubmitter.submitTopology("topology", conf,
>> BinaryTopology(spout, layer));
>>         //Thread.sleep(5 * 1000);
>>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>>         //cluster.killTopology("topology");
>>         //cluster.shutdown();
>>     }
>>
>>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>>     //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>>         return BinaryTopology(input, n,
>> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>>     }
>>
>>     public static FlinkTopology BinaryTopology(IRichSpout input, int
>> n, List<BaseBasicBolt> boltList) {
>>     //public static StormTopology BinaryTopology(IRichSpout input, int
>> n, List<BaseBasicBolt> boltList) {
>>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>>         //TopologyBuilder builder = new TopologyBuilder();
>>         String sourceId = "src";
>>         builder.setSpout(sourceId, input);
>>
>>
>>         String boltId = "bolt";
>>         builder.setBolt(boltId, new Node(), Math.pow(2,
>> n)).shuffleGrouping(sourceId);
>>
>>         return builder.createTopology();
>>     }
>> }
>>
>> ////////////////////////////////////////////////////////////////////////////////
>> // ./src/main/java/myexample/spout/StandaloneSpout.java
>> ////////////////////////////////////////////////////////////////////////////////
>>
>> package myexample.spout;
>>
>> import backtype.storm.spout.SpoutOutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichSpout;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Values;
>>
>> import java.io.*;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>>
>> public class StandaloneSpout extends BaseRichSpout {
>>
>>     private SpoutOutputCollector mCollector;
>>
>>     @Override
>>     public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector collector) {
>>         this.mCollector = collector;
>>     }
>>
>>     @Override
>>     public void nextTuple() {
>>         long currentTime = System.currentTimeMillis();
>>
>>         // TODO: Currently, do not check bound of list, because of
>> experiment.(Avoid branch)
>>         mCollector.emit(new Values(new String("aaa"),
>> System.currentTimeMillis(), 0));
>>
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         System.out.println("declareOutputFields");
>>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>>     }
>> }
>>
>> ////////////////////////////////////////////////////////////////////////////////
>> // ./src/main/java/myexample/bolt/Node.java
>> ////////////////////////////////////////////////////////////////////////////////
>>
>> package myexample.bolt;
>>
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.BasicOutputCollector;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseBasicBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import java.util.Map;
>>
>> public class Node extends BaseBasicBolt {
>>
>>     public static boolean isTupleEmpty(Tuple tuple) {
>>       return false;
>>     }
>>
>>     @Override
>>     public void prepare(Map stormConf, TopologyContext context) {
>>         super.prepare(stormConf, context);
>>     }
>>
>>     @Override
>>     public void cleanup() {
>>         super.cleanup();
>>     }
>>
>>     @Override
>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         declarer.declare(new Fields("string1", "string2", "timestamp",
>> "omitted"));
>>     }
>> }
>>
>> ////////////////////////////////////////////////////////////////////////////////
>> // ./src/main/java/myexample/bolt/StandardBolt.java
>> ////////////////////////////////////////////////////////////////////////////////
>>
>> package myexample.bolt;
>>
>> import java.util.Map;
>>
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.BasicOutputCollector;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseBasicBolt;
>> import backtype.storm.tuple.Tuple;
>>
>> public class StandardBolt extends BaseBasicBolt {
>>
>>     @Override
>>     public void prepare(Map stormConf, TopologyContext context) {
>>         super.prepare(stormConf, context);
>>     }
>>
>>     @Override
>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>     }
>> }
>>
>> Probably it is the source code which has the problem or other things
>> around the project environment might contain the problem. I would
>> really appreciate if you could verify whether the code looks ok or
>> not.
>>
>>>
>>> About further debugging: you can increase the log level to get more
>>> information:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
>>
>> I tried to inject `log4j.properties' file that I got from a sample
>> flink-quickstart-java application created from `mvn
>> archetype:generate' to a ./target/*.jar but it does not work. I tried
>> this because placing that `log4j.properties' file under
>> ./src/main/resources of my project did not work in the first place.
>>
>> Thank you again for your help.
>> With best regards,
>> Shinhyung
>>
>>> Hope this helps!
>>>
>>> -Matthias
>>>
>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>>> Dear Matthias,
>>>>
>>>> Thank you for replying!
>>>>
>>>>     that sounds weird and should not happen -- Spout.open() should get
>>>>     called exactly once.
>>>>
>>>>
>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>>> complicated for me to handle both yet; would it be helpful for me if I
>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>>> invoked, what should be called other than spout.open()?
>>>>
>>>>     I am not sure about multiple calls to
>>>>
>>>>     declareOuputFields though -- if might be called multiple times -- would
>>>>     need to double check the code.
>>>>
>>>>
>>>> I'll check my code too.
>>>>
>>>>
>>>>     However, the call to declareOuputFields should be idempotent, so it
>>>>     should actually not be a problem if it is called multiple times. Even if
>>>>     Storm might call this method only once, there is no guarantee that it is
>>>>     not called multiple time. If this is a problem for you, please let me
>>>>     know. I think we could fix this and make sure the method is only called
>>>>     once.
>>>>
>>>>
>>>> Actually it doesn't seem to be a problem for now. It just does the same
>>>> job multiple times.
>>>>
>>>>
>>>>     It would be helpful if you could share you code. What do you mean with
>>>>     "exits silently"? No submission happens? Did you check the logs? As you
>>>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>>
>>>>
>>>> The topology doesn't seem to continue. There's a set of initialization
>>>> code in the open method of the program's spout and it looks hopeless if
>>>> it's not invoked. Is there any way to check the logs other than using
>>>> println() calls? I'm running it on the commandline with having
>>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>>
>>>>
>>>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>>     latest version from Flink master branch. I should work with 0.10.1
>>>>     without problems.
>>>>
>>>>
>>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>>> repository. So I preferred to use maven central. But I should try with
>>>> the master branch if I have to.
>>>>
>>>> I will quickly check if I could share some of the code.
>>>>
>>>> Thank you again for the help!
>>>> With best regards,
>>>> Shinhyung Yang
>>>>
>>>>
>>>>
>>>>     -Matthias
>>>>
>>>>
>>>>
>>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>>     > Howdies to everyone,
>>>>     >
>>>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>>     > original storm topology works fine on Storm 0.9.5 and I have
>>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>>     > FlinkTopology classes according to the programming guide
>>>>     >
>>>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>>     > What happens is, it seems to be going all the way to submitTopology
>>>>     > method without any problem, however it doesn't invoke open method of
>>>>     > Spout class but declareOutputFields method is called for multiple
>>>>     > times and the program exits silently. Do you guys have any idea what's
>>>>     > going on here or have any suggestions? If needed, then please ask me
>>>>     > for more information.
>>>>     >
>>>>     > Thank you for reading.
>>>>     > With best regards,
>>>>     > Shinhyung Yang
>>>>     >
>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Shinhyung Yang
Dear Matthias,

Thank you very much again. I changed the value of
taskmanager.numberOfTaskSlots from 1 to 128 in
flink-0.10.1/conf/flink-conf.yaml, stopped the local cluster and
started local cluster again. And it works fine and well. (It is still
running and I can check it clear on the webfrontend) Although I'm not
sure whether it would be safe to keep the value like this or not.

Thank you.
Best regards,
Shinhyung

On Thu, Jan 14, 2016 at 7:53 PM, Shinhyung Yang
<[hidden email]> wrote:

> Dear Matthias,
>
> Thank you for a quick reply. It failed again, however I was able to
> access to its WebFrontend and it gave me some logs. I wanted to show
> logs immediately before digging down into it.
>
> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to RUNNING.
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CREATED to SCHEDULED
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> SCHEDULED to DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying Source: src (1/1) (attempt #0) to localhost
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
> SCHEDULED
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
> DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying bolt (1/8) (attempt #0) to localhost
> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Received task Source: src (1/1)
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
> SCHEDULED
> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Loading JAR files for task Source: src (1/1)
> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Received task bolt (1/8)
> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>           - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
> localhost/127.0.0.1:36498
> 19:48:18,017 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> DEPLOYING to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
> CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Attempting to cancel task Source: src (1/1)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Source: src (1/1) switched to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Attempting to cancel task bolt (1/8)
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - bolt (1/8) switched to CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Loading JAR files for task bolt (1/8)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>. Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 19:48:18,019 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to
> CANCELED
> 19:48:18,526 WARN  akka.remote.ReliableDeliverySupervisor
>           - Association with remote system
> [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,527 WARN  akka.remote.ReliableDeliverySupervisor
>           - Association with remote system
> [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,531 WARN  akka.remote.ReliableDeliverySupervisor
>           - Association with remote system
> [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Source: src (1/1) switched to CANCELED
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - bolt (1/8) switched to CANCELED
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Freeing task resources for Source: src (1/1)
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Freeing task resources for bolt (1/8)
> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Unregistering task and sending final execution state
> CANCELED to JobManager for task Source: src
> (3535644576ae695d2685a65401e16fc4)
> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Unregistering task and sending final execution state
> CANCELED to JobManager for task bolt
> (391ac2875a2fdc86d8af4f2d51e3e849)
> 19:48:18,770 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to
> CANCELED
> 19:48:18,770 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CANCELING to CANCELED
> 19:48:18,773 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILED.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>. Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> It seems like my setting has some problem from a quick look at the log?
>
> Thank you.
> Best regards,
> Shinhyung
>
> On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <[hidden email]> wrote:
>> Hi,
>>
>> I can submit the topology without any problems. Your code is fine.
>>
>> If your program "exits silently" I would actually assume, that you
>> submitted the topology successfully. Can you see the topology in
>> JobManager WebFrontend? If not, do you see any errors in the log files?
>>
>> -Matthias
>>
>> On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
>>> Dear Matthias,
>>>
>>> Thank you for the reply! I am so sorry to respond late on the matter.
>>>
>>>> I just double checked the Flink code and during translation from Storm
>>>> to Flink declareOuputFields() is called twice. You are right that is
>>>> does the same job twice, but that is actually not a problem. The Flink
>>>> code is cleaner this way to I guess we will not change it.
>>>
>>> Thank you for checking. I don't think it contributed any part of my
>>> current problem anyways. For my case though, it is called 3 times if
>>> the number is important at all.
>>>
>>>> About lifecyle:
>>>> If you submit your code, during deployment, Spout.open() and
>>>> Bolt.prepare() should be called for each parallel instance on each
>>>> Spout/Bolt of your topology.
>>>>
>>>> About your submission (I guess this should solve your current problem):
>>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>>>> but FlinkSubmitter. You have to distinguish three cases:
>>>>
>>>>   - local/debug/IDE mode: use FlinkLocalCluster
>>>>     => you do not need to start any Flink cluster before --
>>>> FlinkLocalCluster is started up in you current JVM
>>>>     * the purpose is local debugging in an IDE (this allows to easily
>>>> set break points and debug code)
>>>>
>>>>   - pseudo-distributed mode: use FlinkSubmitter
>>>>     => you start up a local Flink cluster via bin/start-local.sh
>>>>     * this local Flink cluster run in an own JVM and looks like a real
>>>> cluster to the Flink client, ie, "bin/flink run"
>>>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>>>> JobManager/Nimbus hostname "localhost")
>>>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>>>> started in your current JVM, but your code is shipped to the local
>>>> cluster you started up beforehand via bin/start-local.sh and executed in
>>>> this JVM
>>>>
>>>>   - distributed mode: use FlinkSubmitter
>>>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>>>     * you use "bin/flink run" to submit your code to the real cluster
>>>
>>> Thank you for the explanation, now I have clearer understanding of
>>> clusters and submitters. However my problem is not fixed yet. Here's
>>> my code:
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/App.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample;
>>>
>>> import backtype.storm.Config;
>>> import backtype.storm.LocalCluster;
>>> import myexample.spout.StandaloneSpout;
>>> import backtype.storm.generated.StormTopology;
>>> import backtype.storm.topology.IRichSpout;
>>> import backtype.storm.topology.TopologyBuilder;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>
>>> import myexample.bolt.Node;
>>> import myexample.bolt.StandardBolt;
>>>
>>> import java.util.Arrays;
>>> import java.util.List;
>>>
>>> //
>>> import org.apache.flink.storm.api.FlinkTopology;
>>> //import org.apache.flink.storm.api.FlinkLocalCluster;
>>> import org.apache.flink.storm.api.FlinkSubmitter;
>>> //import org.apache.flink.storm.api.FlinkClient;
>>> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>>>
>>> public class App
>>> {
>>>     public static void main( String[] args ) throws Exception
>>>     {
>>>         int layer = 0;
>>>         StandaloneSpout spout = new StandaloneSpout();
>>>         Config conf = new Config();
>>>         conf.put(Config.TOPOLOGY_DEBUG, false);
>>>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>>>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>>>         //LocalCluster cluster = new LocalCluster();
>>>
>>>         layer = Integer.parseInt(args[0]);
>>>         //cluster.submitTopology("topology", conf,
>>> BinaryTopology(spout, layer));
>>>         FlinkSubmitter.submitTopology("topology", conf,
>>> BinaryTopology(spout, layer));
>>>         //Thread.sleep(5 * 1000);
>>>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>>>         //cluster.killTopology("topology");
>>>         //cluster.shutdown();
>>>     }
>>>
>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>>>     //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>>>         return BinaryTopology(input, n,
>>> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>>>     }
>>>
>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int
>>> n, List<BaseBasicBolt> boltList) {
>>>     //public static StormTopology BinaryTopology(IRichSpout input, int
>>> n, List<BaseBasicBolt> boltList) {
>>>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>>>         //TopologyBuilder builder = new TopologyBuilder();
>>>         String sourceId = "src";
>>>         builder.setSpout(sourceId, input);
>>>
>>>
>>>         String boltId = "bolt";
>>>         builder.setBolt(boltId, new Node(), Math.pow(2,
>>> n)).shuffleGrouping(sourceId);
>>>
>>>         return builder.createTopology();
>>>     }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/spout/StandaloneSpout.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.spout;
>>>
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>>
>>> import java.io.*;
>>> import java.text.DateFormat;
>>> import java.text.SimpleDateFormat;
>>> import java.util.*;
>>>
>>> public class StandaloneSpout extends BaseRichSpout {
>>>
>>>     private SpoutOutputCollector mCollector;
>>>
>>>     @Override
>>>     public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>>         this.mCollector = collector;
>>>     }
>>>
>>>     @Override
>>>     public void nextTuple() {
>>>         long currentTime = System.currentTimeMillis();
>>>
>>>         // TODO: Currently, do not check bound of list, because of
>>> experiment.(Avoid branch)
>>>         mCollector.emit(new Values(new String("aaa"),
>>> System.currentTimeMillis(), 0));
>>>
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         System.out.println("declareOutputFields");
>>>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>>>     }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/bolt/Node.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.bolt;
>>>
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import java.util.Map;
>>>
>>> public class Node extends BaseBasicBolt {
>>>
>>>     public static boolean isTupleEmpty(Tuple tuple) {
>>>       return false;
>>>     }
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         super.prepare(stormConf, context);
>>>     }
>>>
>>>     @Override
>>>     public void cleanup() {
>>>         super.cleanup();
>>>     }
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("string1", "string2", "timestamp",
>>> "omitted"));
>>>     }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/bolt/StandardBolt.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.bolt;
>>>
>>> import java.util.Map;
>>>
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Tuple;
>>>
>>> public class StandardBolt extends BaseBasicBolt {
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         super.prepare(stormConf, context);
>>>     }
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>     }
>>> }
>>>
>>> Probably it is the source code which has the problem or other things
>>> around the project environment might contain the problem. I would
>>> really appreciate if you could verify whether the code looks ok or
>>> not.
>>>
>>>>
>>>> About further debugging: you can increase the log level to get more
>>>> information:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
>>>
>>> I tried to inject `log4j.properties' file that I got from a sample
>>> flink-quickstart-java application created from `mvn
>>> archetype:generate' to a ./target/*.jar but it does not work. I tried
>>> this because placing that `log4j.properties' file under
>>> ./src/main/resources of my project did not work in the first place.
>>>
>>> Thank you again for your help.
>>> With best regards,
>>> Shinhyung
>>>
>>>> Hope this helps!
>>>>
>>>> -Matthias
>>>>
>>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>>>> Dear Matthias,
>>>>>
>>>>> Thank you for replying!
>>>>>
>>>>>     that sounds weird and should not happen -- Spout.open() should get
>>>>>     called exactly once.
>>>>>
>>>>>
>>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>>>> complicated for me to handle both yet; would it be helpful for me if I
>>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>>>> invoked, what should be called other than spout.open()?
>>>>>
>>>>>     I am not sure about multiple calls to
>>>>>
>>>>>     declareOuputFields though -- if might be called multiple times -- would
>>>>>     need to double check the code.
>>>>>
>>>>>
>>>>> I'll check my code too.
>>>>>
>>>>>
>>>>>     However, the call to declareOuputFields should be idempotent, so it
>>>>>     should actually not be a problem if it is called multiple times. Even if
>>>>>     Storm might call this method only once, there is no guarantee that it is
>>>>>     not called multiple time. If this is a problem for you, please let me
>>>>>     know. I think we could fix this and make sure the method is only called
>>>>>     once.
>>>>>
>>>>>
>>>>> Actually it doesn't seem to be a problem for now. It just does the same
>>>>> job multiple times.
>>>>>
>>>>>
>>>>>     It would be helpful if you could share you code. What do you mean with
>>>>>     "exits silently"? No submission happens? Did you check the logs? As you
>>>>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>>>
>>>>>
>>>>> The topology doesn't seem to continue. There's a set of initialization
>>>>> code in the open method of the program's spout and it looks hopeless if
>>>>> it's not invoked. Is there any way to check the logs other than using
>>>>> println() calls? I'm running it on the commandline with having
>>>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>>>
>>>>>
>>>>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>>>     latest version from Flink master branch. I should work with 0.10.1
>>>>>     without problems.
>>>>>
>>>>>
>>>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>>>> repository. So I preferred to use maven central. But I should try with
>>>>> the master branch if I have to.
>>>>>
>>>>> I will quickly check if I could share some of the code.
>>>>>
>>>>> Thank you again for the help!
>>>>> With best regards,
>>>>> Shinhyung Yang
>>>>>
>>>>>
>>>>>
>>>>>     -Matthias
>>>>>
>>>>>
>>>>>
>>>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>>>     > Howdies to everyone,
>>>>>     >
>>>>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>>>     > original storm topology works fine on Storm 0.9.5 and I have
>>>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>>>     > FlinkTopology classes according to the programming guide
>>>>>     >
>>>>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>>>     > What happens is, it seems to be going all the way to submitTopology
>>>>>     > method without any problem, however it doesn't invoke open method of
>>>>>     > Spout class but declareOutputFields method is called for multiple
>>>>>     > times and the program exits silently. Do you guys have any idea what's
>>>>>     > going on here or have any suggestions? If needed, then please ask me
>>>>>     > for more information.
>>>>>     >
>>>>>     > Thank you for reading.
>>>>>     > With best regards,
>>>>>     > Shinhyung Yang
>>>>>     >
>>>>>
>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Matthias J. Sax-2
In reply to this post by Shinhyung Yang
Hi,

the logs shows:

> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Resources available to scheduler: Number of instances=1, total number of slots=1, available slots=0

You need to increase your task slots in conf/flink-conf.yaml. Look for
parameter "taskmanager.numberOfTaskSlots".


-Matthias


On 01/14/2016 11:53 AM, Shinhyung Yang wrote:

> Dear Matthias,
>
> Thank you for a quick reply. It failed again, however I was able to
> access to its WebFrontend and it gave me some logs. I wanted to show
> logs immediately before digging down into it.
>
> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to RUNNING.
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CREATED to SCHEDULED
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> SCHEDULED to DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying Source: src (1/1) (attempt #0) to localhost
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
> SCHEDULED
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
> DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Deploying bolt (1/8) (attempt #0) to localhost
> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Received task Source: src (1/1)
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
> SCHEDULED
> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Loading JAR files for task Source: src (1/1)
> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Received task bolt (1/8)
> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>           - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
> localhost/127.0.0.1:36498
> 19:48:18,017 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> DEPLOYING to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
> CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Attempting to cancel task Source: src (1/1)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Source: src (1/1) switched to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Attempting to cancel task bolt (1/8)
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - bolt (1/8) switched to CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Loading JAR files for task bolt (1/8)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>> . Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 19:48:18,019 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to
> CANCELED
> 19:48:18,526 WARN  akka.remote.ReliableDeliverySupervisor
>           - Association with remote system
> [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,527 WARN  akka.remote.ReliableDeliverySupervisor
>           - Association with remote system
> [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,531 WARN  akka.remote.ReliableDeliverySupervisor
>           - Association with remote system
> [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Source: src (1/1) switched to CANCELED
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - bolt (1/8) switched to CANCELED
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Freeing task resources for Source: src (1/1)
> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>           - Freeing task resources for bolt (1/8)
> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Unregistering task and sending final execution state
> CANCELED to JobManager for task Source: src
> (3535644576ae695d2685a65401e16fc4)
> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>           - Unregistering task and sending final execution state
> CANCELED to JobManager for task bolt
> (391ac2875a2fdc86d8af4f2d51e3e849)
> 19:48:18,770 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to
> CANCELED
> 19:48:18,770 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CANCELING to CANCELED
> 19:48:18,773 INFO  org.apache.flink.runtime.jobmanager.JobManager
>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILED.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>> . Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> It seems like my setting has some problem from a quick look at the log?
>
> Thank you.
> Best regards,
> Shinhyung
>
> On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <[hidden email]> wrote:
>> Hi,
>>
>> I can submit the topology without any problems. Your code is fine.
>>
>> If your program "exits silently" I would actually assume, that you
>> submitted the topology successfully. Can you see the topology in
>> JobManager WebFrontend? If not, do you see any errors in the log files?
>>
>> -Matthias
>>
>> On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
>>> Dear Matthias,
>>>
>>> Thank you for the reply! I am so sorry to respond late on the matter.
>>>
>>>> I just double checked the Flink code and during translation from Storm
>>>> to Flink declareOuputFields() is called twice. You are right that is
>>>> does the same job twice, but that is actually not a problem. The Flink
>>>> code is cleaner this way to I guess we will not change it.
>>>
>>> Thank you for checking. I don't think it contributed any part of my
>>> current problem anyways. For my case though, it is called 3 times if
>>> the number is important at all.
>>>
>>>> About lifecyle:
>>>> If you submit your code, during deployment, Spout.open() and
>>>> Bolt.prepare() should be called for each parallel instance on each
>>>> Spout/Bolt of your topology.
>>>>
>>>> About your submission (I guess this should solve your current problem):
>>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>>>> but FlinkSubmitter. You have to distinguish three cases:
>>>>
>>>>   - local/debug/IDE mode: use FlinkLocalCluster
>>>>     => you do not need to start any Flink cluster before --
>>>> FlinkLocalCluster is started up in you current JVM
>>>>     * the purpose is local debugging in an IDE (this allows to easily
>>>> set break points and debug code)
>>>>
>>>>   - pseudo-distributed mode: use FlinkSubmitter
>>>>     => you start up a local Flink cluster via bin/start-local.sh
>>>>     * this local Flink cluster run in an own JVM and looks like a real
>>>> cluster to the Flink client, ie, "bin/flink run"
>>>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>>>> JobManager/Nimbus hostname "localhost")
>>>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>>>> started in your current JVM, but your code is shipped to the local
>>>> cluster you started up beforehand via bin/start-local.sh and executed in
>>>> this JVM
>>>>
>>>>   - distributed mode: use FlinkSubmitter
>>>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>>>     * you use "bin/flink run" to submit your code to the real cluster
>>>
>>> Thank you for the explanation, now I have clearer understanding of
>>> clusters and submitters. However my problem is not fixed yet. Here's
>>> my code:
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/App.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample;
>>>
>>> import backtype.storm.Config;
>>> import backtype.storm.LocalCluster;
>>> import myexample.spout.StandaloneSpout;
>>> import backtype.storm.generated.StormTopology;
>>> import backtype.storm.topology.IRichSpout;
>>> import backtype.storm.topology.TopologyBuilder;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>
>>> import myexample.bolt.Node;
>>> import myexample.bolt.StandardBolt;
>>>
>>> import java.util.Arrays;
>>> import java.util.List;
>>>
>>> //
>>> import org.apache.flink.storm.api.FlinkTopology;
>>> //import org.apache.flink.storm.api.FlinkLocalCluster;
>>> import org.apache.flink.storm.api.FlinkSubmitter;
>>> //import org.apache.flink.storm.api.FlinkClient;
>>> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>>>
>>> public class App
>>> {
>>>     public static void main( String[] args ) throws Exception
>>>     {
>>>         int layer = 0;
>>>         StandaloneSpout spout = new StandaloneSpout();
>>>         Config conf = new Config();
>>>         conf.put(Config.TOPOLOGY_DEBUG, false);
>>>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>>>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>>>         //LocalCluster cluster = new LocalCluster();
>>>
>>>         layer = Integer.parseInt(args[0]);
>>>         //cluster.submitTopology("topology", conf,
>>> BinaryTopology(spout, layer));
>>>         FlinkSubmitter.submitTopology("topology", conf,
>>> BinaryTopology(spout, layer));
>>>         //Thread.sleep(5 * 1000);
>>>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>>>         //cluster.killTopology("topology");
>>>         //cluster.shutdown();
>>>     }
>>>
>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>>>     //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>>>         return BinaryTopology(input, n,
>>> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>>>     }
>>>
>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int
>>> n, List<BaseBasicBolt> boltList) {
>>>     //public static StormTopology BinaryTopology(IRichSpout input, int
>>> n, List<BaseBasicBolt> boltList) {
>>>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>>>         //TopologyBuilder builder = new TopologyBuilder();
>>>         String sourceId = "src";
>>>         builder.setSpout(sourceId, input);
>>>
>>>
>>>         String boltId = "bolt";
>>>         builder.setBolt(boltId, new Node(), Math.pow(2,
>>> n)).shuffleGrouping(sourceId);
>>>
>>>         return builder.createTopology();
>>>     }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/spout/StandaloneSpout.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.spout;
>>>
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>>
>>> import java.io.*;
>>> import java.text.DateFormat;
>>> import java.text.SimpleDateFormat;
>>> import java.util.*;
>>>
>>> public class StandaloneSpout extends BaseRichSpout {
>>>
>>>     private SpoutOutputCollector mCollector;
>>>
>>>     @Override
>>>     public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>>         this.mCollector = collector;
>>>     }
>>>
>>>     @Override
>>>     public void nextTuple() {
>>>         long currentTime = System.currentTimeMillis();
>>>
>>>         // TODO: Currently, do not check bound of list, because of
>>> experiment.(Avoid branch)
>>>         mCollector.emit(new Values(new String("aaa"),
>>> System.currentTimeMillis(), 0));
>>>
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         System.out.println("declareOutputFields");
>>>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>>>     }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/bolt/Node.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.bolt;
>>>
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import java.util.Map;
>>>
>>> public class Node extends BaseBasicBolt {
>>>
>>>     public static boolean isTupleEmpty(Tuple tuple) {
>>>       return false;
>>>     }
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         super.prepare(stormConf, context);
>>>     }
>>>
>>>     @Override
>>>     public void cleanup() {
>>>         super.cleanup();
>>>     }
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("string1", "string2", "timestamp",
>>> "omitted"));
>>>     }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/bolt/StandardBolt.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.bolt;
>>>
>>> import java.util.Map;
>>>
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Tuple;
>>>
>>> public class StandardBolt extends BaseBasicBolt {
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         super.prepare(stormConf, context);
>>>     }
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>     }
>>> }
>>>
>>> Probably it is the source code which has the problem or other things
>>> around the project environment might contain the problem. I would
>>> really appreciate if you could verify whether the code looks ok or
>>> not.
>>>
>>>>
>>>> About further debugging: you can increase the log level to get more
>>>> information:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
>>>
>>> I tried to inject `log4j.properties' file that I got from a sample
>>> flink-quickstart-java application created from `mvn
>>> archetype:generate' to a ./target/*.jar but it does not work. I tried
>>> this because placing that `log4j.properties' file under
>>> ./src/main/resources of my project did not work in the first place.
>>>
>>> Thank you again for your help.
>>> With best regards,
>>> Shinhyung
>>>
>>>> Hope this helps!
>>>>
>>>> -Matthias
>>>>
>>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>>>> Dear Matthias,
>>>>>
>>>>> Thank you for replying!
>>>>>
>>>>>     that sounds weird and should not happen -- Spout.open() should get
>>>>>     called exactly once.
>>>>>
>>>>>
>>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>>>> complicated for me to handle both yet; would it be helpful for me if I
>>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>>>> invoked, what should be called other than spout.open()?
>>>>>
>>>>>     I am not sure about multiple calls to
>>>>>
>>>>>     declareOuputFields though -- if might be called multiple times -- would
>>>>>     need to double check the code.
>>>>>
>>>>>
>>>>> I'll check my code too.
>>>>>
>>>>>
>>>>>     However, the call to declareOuputFields should be idempotent, so it
>>>>>     should actually not be a problem if it is called multiple times. Even if
>>>>>     Storm might call this method only once, there is no guarantee that it is
>>>>>     not called multiple time. If this is a problem for you, please let me
>>>>>     know. I think we could fix this and make sure the method is only called
>>>>>     once.
>>>>>
>>>>>
>>>>> Actually it doesn't seem to be a problem for now. It just does the same
>>>>> job multiple times.
>>>>>
>>>>>
>>>>>     It would be helpful if you could share you code. What do you mean with
>>>>>     "exits silently"? No submission happens? Did you check the logs? As you
>>>>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>>>
>>>>>
>>>>> The topology doesn't seem to continue. There's a set of initialization
>>>>> code in the open method of the program's spout and it looks hopeless if
>>>>> it's not invoked. Is there any way to check the logs other than using
>>>>> println() calls? I'm running it on the commandline with having
>>>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>>>
>>>>>
>>>>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>>>     latest version from Flink master branch. I should work with 0.10.1
>>>>>     without problems.
>>>>>
>>>>>
>>>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>>>> repository. So I preferred to use maven central. But I should try with
>>>>> the master branch if I have to.
>>>>>
>>>>> I will quickly check if I could share some of the code.
>>>>>
>>>>> Thank you again for the help!
>>>>> With best regards,
>>>>> Shinhyung Yang
>>>>>
>>>>>
>>>>>
>>>>>     -Matthias
>>>>>
>>>>>
>>>>>
>>>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>>>     > Howdies to everyone,
>>>>>     >
>>>>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>>>     > original storm topology works fine on Storm 0.9.5 and I have
>>>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>>>     > FlinkTopology classes according to the programming guide
>>>>>     >
>>>>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>>>     > What happens is, it seems to be going all the way to submitTopology
>>>>>     > method without any problem, however it doesn't invoke open method of
>>>>>     > Spout class but declareOutputFields method is called for multiple
>>>>>     > times and the program exits silently. Do you guys have any idea what's
>>>>>     > going on here or have any suggestions? If needed, then please ask me
>>>>>     > for more information.
>>>>>     >
>>>>>     > Thank you for reading.
>>>>>     > With best regards,
>>>>>     > Shinhyung Yang
>>>>>     >
>>>>>
>>>>
>>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Working with storm compatibility layer

Matthias J. Sax-2
In reply to this post by Shinhyung Yang
Just saw your email after my answer...

Have a look here about task slots.
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots

Also have a look here (starting from 18:30):
https://www.youtube.com/watch?v=UEkjRN8jRx4

-Matthias


On 01/14/2016 12:05 PM, Shinhyung Yang wrote:

> Dear Matthias,
>
> Thank you very much again. I changed the value of
> taskmanager.numberOfTaskSlots from 1 to 128 in
> flink-0.10.1/conf/flink-conf.yaml, stopped the local cluster and
> started local cluster again. And it works fine and well. (It is still
> running and I can check it clear on the webfrontend) Although I'm not
> sure whether it would be safe to keep the value like this or not.
>
> Thank you.
> Best regards,
> Shinhyung
>
> On Thu, Jan 14, 2016 at 7:53 PM, Shinhyung Yang
> <[hidden email]> wrote:
>> Dear Matthias,
>>
>> Thank you for a quick reply. It failed again, however I was able to
>> access to its WebFrontend and it gave me some logs. I wanted to show
>> logs immediately before digging down into it.
>>
>> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
>> (topology).
>> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
>> (topology).
>> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to RUNNING.
>> 19:48:18,014 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> CREATED to SCHEDULED
>> 19:48:18,014 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> SCHEDULED to DEPLOYING
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Deploying Source: src (1/1) (attempt #0) to localhost
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
>> SCHEDULED
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
>> DEPLOYING
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Deploying bolt (1/8) (attempt #0) to localhost
>> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Received task Source: src (1/1)
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
>> SCHEDULED
>> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Loading JAR files for task Source: src (1/1)
>> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Received task bolt (1/8)
>> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>>           - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
>> localhost/127.0.0.1:36498
>> 19:48:18,017 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> DEPLOYING to CANCELING
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
>> CANCELING
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Attempting to cancel task Source: src (1/1)
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Source: src (1/1) switched to CANCELING
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Attempting to cancel task bolt (1/8)
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - bolt (1/8) switched to CANCELING
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Loading JAR files for task bolt (1/8)
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager
>> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
>> (unassigned) - [SCHEDULED] > with groupID <
>> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
>> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>> . Resources available to scheduler: Number of instances=1, total
>> number of slots=1, available slots=0
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 19:48:18,019 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to
>> CANCELED
>> 19:48:18,020 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to
>> CANCELED
>> 19:48:18,020 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to
>> CANCELED
>> 19:48:18,020 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to
>> CANCELED
>> 19:48:18,526 WARN  akka.remote.ReliableDeliverySupervisor
>>           - Association with remote system
>> [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated
>> for [5000] ms. Reason is: [Disassociated].
>> 19:48:18,527 WARN  akka.remote.ReliableDeliverySupervisor
>>           - Association with remote system
>> [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated
>> for [5000] ms. Reason is: [Disassociated].
>> 19:48:18,531 WARN  akka.remote.ReliableDeliverySupervisor
>>           - Association with remote system
>> [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated
>> for [5000] ms. Reason is: [Disassociated].
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Source: src (1/1) switched to CANCELED
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - bolt (1/8) switched to CANCELED
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Freeing task resources for Source: src (1/1)
>> 19:48:18,768 INFO  org.apache.flink.runtime.taskmanager.Task
>>           - Freeing task resources for bolt (1/8)
>> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Unregistering task and sending final execution state
>> CANCELED to JobManager for task Source: src
>> (3535644576ae695d2685a65401e16fc4)
>> 19:48:18,769 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>           - Unregistering task and sending final execution state
>> CANCELED to JobManager for task bolt
>> (391ac2875a2fdc86d8af4f2d51e3e849)
>> 19:48:18,770 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to
>> CANCELED
>> 19:48:18,770 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> CANCELING to CANCELED
>> 19:48:18,773 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>           - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to FAILED.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager
>> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
>> (unassigned) - [SCHEDULED] > with groupID <
>> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
>> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>> . Resources available to scheduler: Number of instances=1, total
>> number of slots=1, available slots=0
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
>> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> It seems like my setting has some problem from a quick look at the log?
>>
>> Thank you.
>> Best regards,
>> Shinhyung
>>
>> On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <[hidden email]> wrote:
>>> Hi,
>>>
>>> I can submit the topology without any problems. Your code is fine.
>>>
>>> If your program "exits silently" I would actually assume, that you
>>> submitted the topology successfully. Can you see the topology in
>>> JobManager WebFrontend? If not, do you see any errors in the log files?
>>>
>>> -Matthias
>>>
>>> On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
>>>> Dear Matthias,
>>>>
>>>> Thank you for the reply! I am so sorry to respond late on the matter.
>>>>
>>>>> I just double checked the Flink code and during translation from Storm
>>>>> to Flink declareOuputFields() is called twice. You are right that is
>>>>> does the same job twice, but that is actually not a problem. The Flink
>>>>> code is cleaner this way to I guess we will not change it.
>>>>
>>>> Thank you for checking. I don't think it contributed any part of my
>>>> current problem anyways. For my case though, it is called 3 times if
>>>> the number is important at all.
>>>>
>>>>> About lifecyle:
>>>>> If you submit your code, during deployment, Spout.open() and
>>>>> Bolt.prepare() should be called for each parallel instance on each
>>>>> Spout/Bolt of your topology.
>>>>>
>>>>> About your submission (I guess this should solve your current problem):
>>>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>>>>> but FlinkSubmitter. You have to distinguish three cases:
>>>>>
>>>>>   - local/debug/IDE mode: use FlinkLocalCluster
>>>>>     => you do not need to start any Flink cluster before --
>>>>> FlinkLocalCluster is started up in you current JVM
>>>>>     * the purpose is local debugging in an IDE (this allows to easily
>>>>> set break points and debug code)
>>>>>
>>>>>   - pseudo-distributed mode: use FlinkSubmitter
>>>>>     => you start up a local Flink cluster via bin/start-local.sh
>>>>>     * this local Flink cluster run in an own JVM and looks like a real
>>>>> cluster to the Flink client, ie, "bin/flink run"
>>>>>     * thus, you just use FlinkSubmitter as for a real cluster (with
>>>>> JobManager/Nimbus hostname "localhost")
>>>>>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>>>>> started in your current JVM, but your code is shipped to the local
>>>>> cluster you started up beforehand via bin/start-local.sh and executed in
>>>>> this JVM
>>>>>
>>>>>   - distributed mode: use FlinkSubmitter
>>>>>     => you start up Flink in a real cluster using bin/start-cluster.sh
>>>>>     * you use "bin/flink run" to submit your code to the real cluster
>>>>
>>>> Thank you for the explanation, now I have clearer understanding of
>>>> clusters and submitters. However my problem is not fixed yet. Here's
>>>> my code:
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/App.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample;
>>>>
>>>> import backtype.storm.Config;
>>>> import backtype.storm.LocalCluster;
>>>> import myexample.spout.StandaloneSpout;
>>>> import backtype.storm.generated.StormTopology;
>>>> import backtype.storm.topology.IRichSpout;
>>>> import backtype.storm.topology.TopologyBuilder;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>
>>>> import myexample.bolt.Node;
>>>> import myexample.bolt.StandardBolt;
>>>>
>>>> import java.util.Arrays;
>>>> import java.util.List;
>>>>
>>>> //
>>>> import org.apache.flink.storm.api.FlinkTopology;
>>>> //import org.apache.flink.storm.api.FlinkLocalCluster;
>>>> import org.apache.flink.storm.api.FlinkSubmitter;
>>>> //import org.apache.flink.storm.api.FlinkClient;
>>>> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>>>>
>>>> public class App
>>>> {
>>>>     public static void main( String[] args ) throws Exception
>>>>     {
>>>>         int layer = 0;
>>>>         StandaloneSpout spout = new StandaloneSpout();
>>>>         Config conf = new Config();
>>>>         conf.put(Config.TOPOLOGY_DEBUG, false);
>>>>         //FlinkLocalCluster cluster = new FlinkLocalCluster();
>>>>         //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>>>>         //LocalCluster cluster = new LocalCluster();
>>>>
>>>>         layer = Integer.parseInt(args[0]);
>>>>         //cluster.submitTopology("topology", conf,
>>>> BinaryTopology(spout, layer));
>>>>         FlinkSubmitter.submitTopology("topology", conf,
>>>> BinaryTopology(spout, layer));
>>>>         //Thread.sleep(5 * 1000);
>>>>         //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>>>>         //cluster.killTopology("topology");
>>>>         //cluster.shutdown();
>>>>     }
>>>>
>>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>>>>     //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>>>>         return BinaryTopology(input, n,
>>>> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>>>>     }
>>>>
>>>>     public static FlinkTopology BinaryTopology(IRichSpout input, int
>>>> n, List<BaseBasicBolt> boltList) {
>>>>     //public static StormTopology BinaryTopology(IRichSpout input, int
>>>> n, List<BaseBasicBolt> boltList) {
>>>>         FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>>>>         //TopologyBuilder builder = new TopologyBuilder();
>>>>         String sourceId = "src";
>>>>         builder.setSpout(sourceId, input);
>>>>
>>>>
>>>>         String boltId = "bolt";
>>>>         builder.setBolt(boltId, new Node(), Math.pow(2,
>>>> n)).shuffleGrouping(sourceId);
>>>>
>>>>         return builder.createTopology();
>>>>     }
>>>> }
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/spout/StandaloneSpout.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample.spout;
>>>>
>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Values;
>>>>
>>>> import java.io.*;
>>>> import java.text.DateFormat;
>>>> import java.text.SimpleDateFormat;
>>>> import java.util.*;
>>>>
>>>> public class StandaloneSpout extends BaseRichSpout {
>>>>
>>>>     private SpoutOutputCollector mCollector;
>>>>
>>>>     @Override
>>>>     public void open(Map conf, TopologyContext context,
>>>> SpoutOutputCollector collector) {
>>>>         this.mCollector = collector;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void nextTuple() {
>>>>         long currentTime = System.currentTimeMillis();
>>>>
>>>>         // TODO: Currently, do not check bound of list, because of
>>>> experiment.(Avoid branch)
>>>>         mCollector.emit(new Values(new String("aaa"),
>>>> System.currentTimeMillis(), 0));
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         System.out.println("declareOutputFields");
>>>>         declarer.declare(new Fields("string1", "timestamp", "omitted"));
>>>>     }
>>>> }
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/bolt/Node.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample.bolt;
>>>>
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import java.util.Map;
>>>>
>>>> public class Node extends BaseBasicBolt {
>>>>
>>>>     public static boolean isTupleEmpty(Tuple tuple) {
>>>>       return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>         super.prepare(stormConf, context);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void cleanup() {
>>>>         super.cleanup();
>>>>     }
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>         collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("string1", "string2", "timestamp",
>>>> "omitted"));
>>>>     }
>>>> }
>>>>
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>> // ./src/main/java/myexample/bolt/StandardBolt.java
>>>> ////////////////////////////////////////////////////////////////////////////////
>>>>
>>>> package myexample.bolt;
>>>>
>>>> import java.util.Map;
>>>>
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Tuple;
>>>>
>>>> public class StandardBolt extends BaseBasicBolt {
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>         super.prepare(stormConf, context);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>     }
>>>> }
>>>>
>>>> Probably it is the source code which has the problem or other things
>>>> around the project environment might contain the problem. I would
>>>> really appreciate if you could verify whether the code looks ok or
>>>> not.
>>>>
>>>>>
>>>>> About further debugging: you can increase the log level to get more
>>>>> information:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html
>>>>
>>>> I tried to inject `log4j.properties' file that I got from a sample
>>>> flink-quickstart-java application created from `mvn
>>>> archetype:generate' to a ./target/*.jar but it does not work. I tried
>>>> this because placing that `log4j.properties' file under
>>>> ./src/main/resources of my project did not work in the first place.
>>>>
>>>> Thank you again for your help.
>>>> With best regards,
>>>> Shinhyung
>>>>
>>>>> Hope this helps!
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>>>>> Dear Matthias,
>>>>>>
>>>>>> Thank you for replying!
>>>>>>
>>>>>>     that sounds weird and should not happen -- Spout.open() should get
>>>>>>     called exactly once.
>>>>>>
>>>>>>
>>>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>>>>> complicated for me to handle both yet; would it be helpful for me if I
>>>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>>>>> invoked, what should be called other than spout.open()?
>>>>>>
>>>>>>     I am not sure about multiple calls to
>>>>>>
>>>>>>     declareOuputFields though -- if might be called multiple times -- would
>>>>>>     need to double check the code.
>>>>>>
>>>>>>
>>>>>> I'll check my code too.
>>>>>>
>>>>>>
>>>>>>     However, the call to declareOuputFields should be idempotent, so it
>>>>>>     should actually not be a problem if it is called multiple times. Even if
>>>>>>     Storm might call this method only once, there is no guarantee that it is
>>>>>>     not called multiple time. If this is a problem for you, please let me
>>>>>>     know. I think we could fix this and make sure the method is only called
>>>>>>     once.
>>>>>>
>>>>>>
>>>>>> Actually it doesn't seem to be a problem for now. It just does the same
>>>>>> job multiple times.
>>>>>>
>>>>>>
>>>>>>     It would be helpful if you could share you code. What do you mean with
>>>>>>     "exits silently"? No submission happens? Did you check the logs? As you
>>>>>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>>>>
>>>>>>
>>>>>> The topology doesn't seem to continue. There's a set of initialization
>>>>>> code in the open method of the program's spout and it looks hopeless if
>>>>>> it's not invoked. Is there any way to check the logs other than using
>>>>>> println() calls? I'm running it on the commandline with having
>>>>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>>>>
>>>>>>
>>>>>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>>>>     latest version from Flink master branch. I should work with 0.10.1
>>>>>>     without problems.
>>>>>>
>>>>>>
>>>>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>>>>> repository. So I preferred to use maven central. But I should try with
>>>>>> the master branch if I have to.
>>>>>>
>>>>>> I will quickly check if I could share some of the code.
>>>>>>
>>>>>> Thank you again for the help!
>>>>>> With best regards,
>>>>>> Shinhyung Yang
>>>>>>
>>>>>>
>>>>>>
>>>>>>     -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>>>>     > Howdies to everyone,
>>>>>>     >
>>>>>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>>>>     > original storm topology works fine on Storm 0.9.5 and I have
>>>>>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>>>>     > FlinkTopology classes according to the programming guide
>>>>>>     >
>>>>>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>>>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>>>>     > What happens is, it seems to be going all the way to submitTopology
>>>>>>     > method without any problem, however it doesn't invoke open method of
>>>>>>     > Spout class but declareOutputFields method is called for multiple
>>>>>>     > times and the program exits silently. Do you guys have any idea what's
>>>>>>     > going on here or have any suggestions? If needed, then please ask me
>>>>>>     > for more information.
>>>>>>     >
>>>>>>     > Thank you for reading.
>>>>>>     > With best regards,
>>>>>>     > Shinhyung Yang
>>>>>>     >
>>>>>>
>>>>>
>>>


signature.asc (836 bytes) Download Attachment