Re: Working with storm compatibility layer

Posted by Shinhyung Yang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Working-with-storm-compatibility-layer-tp4205p4279.html

Dear Matthias,

Thank you for the reply! I am so sorry to respond late on the matter.

> I just double checked the Flink code and during translation from Storm
> to Flink declareOuputFields() is called twice. You are right that is
> does the same job twice, but that is actually not a problem. The Flink
> code is cleaner this way to I guess we will not change it.

Thank you for checking. I don't think it contributed any part of my
current problem anyways. For my case though, it is called 3 times if
the number is important at all.

> About lifecyle:
> If you submit your code, during deployment, Spout.open() and
> Bolt.prepare() should be called for each parallel instance on each
> Spout/Bolt of your topology.
>
> About your submission (I guess this should solve your current problem):
> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
> but FlinkSubmitter. You have to distinguish three cases:
>
>   - local/debug/IDE mode: use FlinkLocalCluster
>     => you do not need to start any Flink cluster before --
> FlinkLocalCluster is started up in you current JVM
>     * the purpose is local debugging in an IDE (this allows to easily
> set break points and debug code)
>
>   - pseudo-distributed mode: use FlinkSubmitter
>     => you start up a local Flink cluster via bin/start-local.sh
>     * this local Flink cluster run in an own JVM and looks like a real
> cluster to the Flink client, ie, "bin/flink run"
>     * thus, you just use FlinkSubmitter as for a real cluster (with
> JobManager/Nimbus hostname "localhost")
>     * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
> started in your current JVM, but your code is shipped to the local
> cluster you started up beforehand via bin/start-local.sh and executed in
> this JVM
>
>   - distributed mode: use FlinkSubmitter
>     => you start up Flink in a real cluster using bin/start-cluster.sh
>     * you use "bin/flink run" to submit your code to the real cluster

Thank you for the explanation, now I have clearer understanding of
clusters and submitters. However my problem is not fixed yet. Here's
my code:

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/App.java
////////////////////////////////////////////////////////////////////////////////

package myexample;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import myexample.spout.StandaloneSpout;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;

import myexample.bolt.Node;
import myexample.bolt.StandardBolt;

import java.util.Arrays;
import java.util.List;

//
import org.apache.flink.storm.api.FlinkTopology;
//import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkSubmitter;
//import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkTopologyBuilder;

public class App
{
    public static void main( String[] args ) throws Exception
    {
        int layer = 0;
        StandaloneSpout spout = new StandaloneSpout();
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_DEBUG, false);
        //FlinkLocalCluster cluster = new FlinkLocalCluster();
        //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        //LocalCluster cluster = new LocalCluster();

        layer = Integer.parseInt(args[0]);
        //cluster.submitTopology("topology", conf,
BinaryTopology(spout, layer));
        FlinkSubmitter.submitTopology("topology", conf,
BinaryTopology(spout, layer));
        //Thread.sleep(5 * 1000);
        //FlinkClient.getConfiguredClient(conf).killTopology("topology");
        //cluster.killTopology("topology");
        //cluster.shutdown();
    }

    public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
    //public static StormTopology BinaryTopology(IRichSpout input, int n) {
        return BinaryTopology(input, n,
Arrays.asList((BaseBasicBolt)new StandardBolt()));
    }

    public static FlinkTopology BinaryTopology(IRichSpout input, int
n, List<BaseBasicBolt> boltList) {
    //public static StormTopology BinaryTopology(IRichSpout input, int
n, List<BaseBasicBolt> boltList) {
        FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
        //TopologyBuilder builder = new TopologyBuilder();
        String sourceId = "src";
        builder.setSpout(sourceId, input);


        String boltId = "bolt";
        builder.setBolt(boltId, new Node(), Math.pow(2,
n)).shuffleGrouping(sourceId);

        return builder.createTopology();
    }
}

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/spout/StandaloneSpout.java
////////////////////////////////////////////////////////////////////////////////

package myexample.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class StandaloneSpout extends BaseRichSpout {

    private SpoutOutputCollector mCollector;

    @Override
    public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
        this.mCollector = collector;
    }

    @Override
    public void nextTuple() {
        long currentTime = System.currentTimeMillis();

        // TODO: Currently, do not check bound of list, because of
experiment.(Avoid branch)
        mCollector.emit(new Values(new String("aaa"),
System.currentTimeMillis(), 0));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        System.out.println("declareOutputFields");
        declarer.declare(new Fields("string1", "timestamp", "omitted"));
    }
}

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/bolt/Node.java
////////////////////////////////////////////////////////////////////////////////

package myexample.bolt;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class Node extends BaseBasicBolt {

    public static boolean isTupleEmpty(Tuple tuple) {
      return false;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }

    @Override
    public void cleanup() {
        super.cleanup();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("string1", "string2", "timestamp",
"omitted"));
    }
}

////////////////////////////////////////////////////////////////////////////////
// ./src/main/java/myexample/bolt/StandardBolt.java
////////////////////////////////////////////////////////////////////////////////

package myexample.bolt;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class StandardBolt extends BaseBasicBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }
}

Probably it is the source code which has the problem or other things
around the project environment might contain the problem. I would
really appreciate if you could verify whether the code looks ok or
not.

>
> About further debugging: you can increase the log level to get more
> information:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html

I tried to inject `log4j.properties' file that I got from a sample
flink-quickstart-java application created from `mvn
archetype:generate' to a ./target/*.jar but it does not work. I tried
this because placing that `log4j.properties' file under
./src/main/resources of my project did not work in the first place.

Thank you again for your help.
With best regards,
Shinhyung

> Hope this helps!
>
> -Matthias
>
> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>> Dear Matthias,
>>
>> Thank you for replying!
>>
>>     that sounds weird and should not happen -- Spout.open() should get
>>     called exactly once.
>>
>>
>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>> complicated for me to handle both yet; would it be helpful for me if I
>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>> invoked, what should be called other than spout.open()?
>>
>>     I am not sure about multiple calls to
>>
>>     declareOuputFields though -- if might be called multiple times -- would
>>     need to double check the code.
>>
>>
>> I'll check my code too.
>>
>>
>>     However, the call to declareOuputFields should be idempotent, so it
>>     should actually not be a problem if it is called multiple times. Even if
>>     Storm might call this method only once, there is no guarantee that it is
>>     not called multiple time. If this is a problem for you, please let me
>>     know. I think we could fix this and make sure the method is only called
>>     once.
>>
>>
>> Actually it doesn't seem to be a problem for now. It just does the same
>> job multiple times.
>>
>>
>>     It would be helpful if you could share you code. What do you mean with
>>     "exits silently"? No submission happens? Did you check the logs? As you
>>     mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>
>>
>> The topology doesn't seem to continue. There's a set of initialization
>> code in the open method of the program's spout and it looks hopeless if
>> it's not invoked. Is there any way to check the logs other than using
>> println() calls? I'm running it on the commandline with having
>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>
>>
>>     Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>     latest version from Flink master branch. I should work with 0.10.1
>>     without problems.
>>
>>
>> It was vey tedious for me to deal with a pom.xml file and .m2
>> repository. So I preferred to use maven central. But I should try with
>> the master branch if I have to.
>>
>> I will quickly check if I could share some of the code.
>>
>> Thank you again for the help!
>> With best regards,
>> Shinhyung Yang
>>
>>
>>
>>     -Matthias
>>
>>
>>
>>     On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>     > Howdies to everyone,
>>     >
>>     > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>     > original storm topology works fine on Storm 0.9.5 and I have
>>     > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>     > FlinkTopology classes according to the programming guide
>>     >
>>     (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>     > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>     > What happens is, it seems to be going all the way to submitTopology
>>     > method without any problem, however it doesn't invoke open method of
>>     > Spout class but declareOutputFields method is called for multiple
>>     > times and the program exits silently. Do you guys have any idea what's
>>     > going on here or have any suggestions? If needed, then please ask me
>>     > for more information.
>>     >
>>     > Thank you for reading.
>>     > With best regards,
>>     > Shinhyung Yang
>>     >
>>
>