http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Working-with-storm-compatibility-layer-tp4205p4289.html
Thank you very much again. I changed the value of
started local cluster again. And it works fine and well. (It is still
> Dear Matthias,
>
> Thank you for a quick reply. It failed again, however I was able to
> access to its WebFrontend and it gave me some logs. I wanted to show
> logs immediately before digging down into it.
>
> 19:48:18,011 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to RUNNING.
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CREATED to SCHEDULED
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> SCHEDULED to DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Deploying Source: src (1/1) (attempt #0) to localhost
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
> SCHEDULED
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
> DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Deploying bolt (1/8) (attempt #0) to localhost
> 19:48:18,015 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task Source: src (1/1)
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
> SCHEDULED
> 19:48:18,016 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task Source: src (1/1)
> 19:48:18,017 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Received task bolt (1/8)
> 19:48:18,017 INFO org.apache.flink.runtime.blob.BlobCache
> - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
> localhost/127.0.0.1:36498
> 19:48:18,017 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> DEPLOYING to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
> CANCELING
> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task Source: src (1/1)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
> CANCELED
> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: src (1/1) switched to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to cancel task bolt (1/8)
> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task
> - bolt (1/8) switched to CANCELING
> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task
> - Loading JAR files for task bolt (1/8)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>. Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 19:48:18,019 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to
> CANCELED
> 19:48:18,020 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to
> CANCELED
> 19:48:18,526 WARN akka.remote.ReliableDeliverySupervisor
> - Association with remote system
> [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,527 WARN akka.remote.ReliableDeliverySupervisor
> - Association with remote system
> [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,531 WARN akka.remote.ReliableDeliverySupervisor
> - Association with remote system
> [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated
> for [5000] ms. Reason is: [Disassociated].
> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: src (1/1) switched to CANCELED
> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task
> - bolt (1/8) switched to CANCELED
> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for Source: src (1/1)
> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for bolt (1/8)
> 19:48:18,769 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Unregistering task and sending final execution state
> CANCELED to JobManager for task Source: src
> (3535644576ae695d2685a65401e16fc4)
> 19:48:18,769 INFO org.apache.flink.runtime.taskmanager.TaskManager
> - Unregistering task and sending final execution state
> CANCELED to JobManager for task bolt
> (391ac2875a2fdc86d8af4f2d51e3e849)
> 19:48:18,770 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to
> CANCELED
> 19:48:18,770 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CANCELING to CANCELED
> 19:48:18,773 INFO org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILED.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>. Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
> at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> It seems like my setting has some problem from a quick look at the log?
>
> Thank you.
> Best regards,
> Shinhyung
>
> On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <
[hidden email]> wrote:
>> Hi,
>>
>> I can submit the topology without any problems. Your code is fine.
>>
>> If your program "exits silently" I would actually assume, that you
>> submitted the topology successfully. Can you see the topology in
>> JobManager WebFrontend? If not, do you see any errors in the log files?
>>
>> -Matthias
>>
>> On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
>>> Dear Matthias,
>>>
>>> Thank you for the reply! I am so sorry to respond late on the matter.
>>>
>>>> I just double checked the Flink code and during translation from Storm
>>>> to Flink declareOuputFields() is called twice. You are right that is
>>>> does the same job twice, but that is actually not a problem. The Flink
>>>> code is cleaner this way to I guess we will not change it.
>>>
>>> Thank you for checking. I don't think it contributed any part of my
>>> current problem anyways. For my case though, it is called 3 times if
>>> the number is important at all.
>>>
>>>> About lifecyle:
>>>> If you submit your code, during deployment, Spout.open() and
>>>> Bolt.prepare() should be called for each parallel instance on each
>>>> Spout/Bolt of your topology.
>>>>
>>>> About your submission (I guess this should solve your current problem):
>>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>>>> but FlinkSubmitter. You have to distinguish three cases:
>>>>
>>>> - local/debug/IDE mode: use FlinkLocalCluster
>>>> => you do not need to start any Flink cluster before --
>>>> FlinkLocalCluster is started up in you current JVM
>>>> * the purpose is local debugging in an IDE (this allows to easily
>>>> set break points and debug code)
>>>>
>>>> - pseudo-distributed mode: use FlinkSubmitter
>>>> => you start up a local Flink cluster via bin/start-local.sh
>>>> * this local Flink cluster run in an own JVM and looks like a real
>>>> cluster to the Flink client, ie, "bin/flink run"
>>>> * thus, you just use FlinkSubmitter as for a real cluster (with
>>>> JobManager/Nimbus hostname "localhost")
>>>> * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>>>> started in your current JVM, but your code is shipped to the local
>>>> cluster you started up beforehand via bin/start-local.sh and executed in
>>>> this JVM
>>>>
>>>> - distributed mode: use FlinkSubmitter
>>>> => you start up Flink in a real cluster using bin/start-cluster.sh
>>>> * you use "bin/flink run" to submit your code to the real cluster
>>>
>>> Thank you for the explanation, now I have clearer understanding of
>>> clusters and submitters. However my problem is not fixed yet. Here's
>>> my code:
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/App.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample;
>>>
>>> import backtype.storm.Config;
>>> import backtype.storm.LocalCluster;
>>> import myexample.spout.StandaloneSpout;
>>> import backtype.storm.generated.StormTopology;
>>> import backtype.storm.topology.IRichSpout;
>>> import backtype.storm.topology.TopologyBuilder;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>
>>> import myexample.bolt.Node;
>>> import myexample.bolt.StandardBolt;
>>>
>>> import java.util.Arrays;
>>> import java.util.List;
>>>
>>> //
>>> import org.apache.flink.storm.api.FlinkTopology;
>>> //import org.apache.flink.storm.api.FlinkLocalCluster;
>>> import org.apache.flink.storm.api.FlinkSubmitter;
>>> //import org.apache.flink.storm.api.FlinkClient;
>>> import org.apache.flink.storm.api.FlinkTopologyBuilder;
>>>
>>> public class App
>>> {
>>> public static void main( String[] args ) throws Exception
>>> {
>>> int layer = 0;
>>> StandaloneSpout spout = new StandaloneSpout();
>>> Config conf = new Config();
>>> conf.put(Config.TOPOLOGY_DEBUG, false);
>>> //FlinkLocalCluster cluster = new FlinkLocalCluster();
>>> //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
>>> //LocalCluster cluster = new LocalCluster();
>>>
>>> layer = Integer.parseInt(args[0]);
>>> //cluster.submitTopology("topology", conf,
>>> BinaryTopology(spout, layer));
>>> FlinkSubmitter.submitTopology("topology", conf,
>>> BinaryTopology(spout, layer));
>>> //Thread.sleep(5 * 1000);
>>> //FlinkClient.getConfiguredClient(conf).killTopology("topology");
>>> //cluster.killTopology("topology");
>>> //cluster.shutdown();
>>> }
>>>
>>> public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
>>> //public static StormTopology BinaryTopology(IRichSpout input, int n) {
>>> return BinaryTopology(input, n,
>>> Arrays.asList((BaseBasicBolt)new StandardBolt()));
>>> }
>>>
>>> public static FlinkTopology BinaryTopology(IRichSpout input, int
>>> n, List<BaseBasicBolt> boltList) {
>>> //public static StormTopology BinaryTopology(IRichSpout input, int
>>> n, List<BaseBasicBolt> boltList) {
>>> FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
>>> //TopologyBuilder builder = new TopologyBuilder();
>>> String sourceId = "src";
>>> builder.setSpout(sourceId, input);
>>>
>>>
>>> String boltId = "bolt";
>>> builder.setBolt(boltId, new Node(), Math.pow(2,
>>> n)).shuffleGrouping(sourceId);
>>>
>>> return builder.createTopology();
>>> }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/spout/StandaloneSpout.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.spout;
>>>
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>>
>>> import java.io.*;
>>> import java.text.DateFormat;
>>> import java.text.SimpleDateFormat;
>>> import java.util.*;
>>>
>>> public class StandaloneSpout extends BaseRichSpout {
>>>
>>> private SpoutOutputCollector mCollector;
>>>
>>> @Override
>>> public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>> this.mCollector = collector;
>>> }
>>>
>>> @Override
>>> public void nextTuple() {
>>> long currentTime = System.currentTimeMillis();
>>>
>>> // TODO: Currently, do not check bound of list, because of
>>> experiment.(Avoid branch)
>>> mCollector.emit(new Values(new String("aaa"),
>>> System.currentTimeMillis(), 0));
>>>
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> System.out.println("declareOutputFields");
>>> declarer.declare(new Fields("string1", "timestamp", "omitted"));
>>> }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/bolt/Node.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.bolt;
>>>
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import java.util.Map;
>>>
>>> public class Node extends BaseBasicBolt {
>>>
>>> public static boolean isTupleEmpty(Tuple tuple) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public void prepare(Map stormConf, TopologyContext context) {
>>> super.prepare(stormConf, context);
>>> }
>>>
>>> @Override
>>> public void cleanup() {
>>> super.cleanup();
>>> }
>>>
>>> @Override
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>> collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0));
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> declarer.declare(new Fields("string1", "string2", "timestamp",
>>> "omitted"));
>>> }
>>> }
>>>
>>> ////////////////////////////////////////////////////////////////////////////////
>>> // ./src/main/java/myexample/bolt/StandardBolt.java
>>> ////////////////////////////////////////////////////////////////////////////////
>>>
>>> package myexample.bolt;
>>>
>>> import java.util.Map;
>>>
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Tuple;
>>>
>>> public class StandardBolt extends BaseBasicBolt {
>>>
>>> @Override
>>> public void prepare(Map stormConf, TopologyContext context) {
>>> super.prepare(stormConf, context);
>>> }
>>>
>>> @Override
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>> }
>>> }
>>>
>>> Probably it is the source code which has the problem or other things
>>> around the project environment might contain the problem. I would
>>> really appreciate if you could verify whether the code looks ok or
>>> not.
>>>
>>>>
>>>> About further debugging: you can increase the log level to get more
>>>> information:
>>>>
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html>>>
>>> I tried to inject `log4j.properties' file that I got from a sample
>>> flink-quickstart-java application created from `mvn
>>> archetype:generate' to a ./target/*.jar but it does not work. I tried
>>> this because placing that `log4j.properties' file under
>>> ./src/main/resources of my project did not work in the first place.
>>>
>>> Thank you again for your help.
>>> With best regards,
>>> Shinhyung
>>>
>>>> Hope this helps!
>>>>
>>>> -Matthias
>>>>
>>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote:
>>>>> Dear Matthias,
>>>>>
>>>>> Thank you for replying!
>>>>>
>>>>> that sounds weird and should not happen -- Spout.open() should get
>>>>> called exactly once.
>>>>>
>>>>>
>>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite
>>>>> complicated for me to handle both yet; would it be helpful for me if I
>>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology()
>>>>> invoked, what should be called other than spout.open()?
>>>>>
>>>>> I am not sure about multiple calls to
>>>>>
>>>>> declareOuputFields though -- if might be called multiple times -- would
>>>>> need to double check the code.
>>>>>
>>>>>
>>>>> I'll check my code too.
>>>>>
>>>>>
>>>>> However, the call to declareOuputFields should be idempotent, so it
>>>>> should actually not be a problem if it is called multiple times. Even if
>>>>> Storm might call this method only once, there is no guarantee that it is
>>>>> not called multiple time. If this is a problem for you, please let me
>>>>> know. I think we could fix this and make sure the method is only called
>>>>> once.
>>>>>
>>>>>
>>>>> Actually it doesn't seem to be a problem for now. It just does the same
>>>>> job multiple times.
>>>>>
>>>>>
>>>>> It would be helpful if you could share you code. What do you mean with
>>>>> "exits silently"? No submission happens? Did you check the logs? As you
>>>>> mentioned FlinkLocalCluster, I assume that you run within an IDE?
>>>>>
>>>>>
>>>>> The topology doesn't seem to continue. There's a set of initialization
>>>>> code in the open method of the program's spout and it looks hopeless if
>>>>> it's not invoked. Is there any way to check the logs other than using
>>>>> println() calls? I'm running it on the commandline with having
>>>>> `bin/start_local.sh' running in the background and `bin/flink run'.
>>>>>
>>>>>
>>>>> Btw: lately we fixed a couple of bugs. I would suggest that you use the
>>>>> latest version from Flink master branch. I should work with 0.10.1
>>>>> without problems.
>>>>>
>>>>>
>>>>> It was vey tedious for me to deal with a pom.xml file and .m2
>>>>> repository. So I preferred to use maven central. But I should try with
>>>>> the master branch if I have to.
>>>>>
>>>>> I will quickly check if I could share some of the code.
>>>>>
>>>>> Thank you again for the help!
>>>>> With best regards,
>>>>> Shinhyung Yang
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 01/09/2016 01:27 AM, Shinhyung Yang wrote:
>>>>> > Howdies to everyone,
>>>>> >
>>>>> > I'm trying to use the storm compatibility layer on Flink 0.10.1. The
>>>>> > original storm topology works fine on Storm 0.9.5 and I have
>>>>> > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and
>>>>> > FlinkTopology classes according to the programming guide
>>>>> >
>>>>> (
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html).
>>>>> > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511).
>>>>> > What happens is, it seems to be going all the way to submitTopology
>>>>> > method without any problem, however it doesn't invoke open method of
>>>>> > Spout class but declareOutputFields method is called for multiple
>>>>> > times and the program exits silently. Do you guys have any idea what's
>>>>> > going on here or have any suggestions? If needed, then please ask me
>>>>> > for more information.
>>>>> >
>>>>> > Thank you for reading.
>>>>> > With best regards,
>>>>> > Shinhyung Yang
>>>>> >
>>>>>
>>>>
>>