Hello,
I am new to Flink and trying to learn this framework. Seems great so far. I am trying to translate my existing storm Topology to a Flink job and I am having trouble consuming data from Kafka. Here’s what my Job looks like:
public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", “hostname:port”);
properties.setProperty("group.id", “stream-test"); properties.setProperty("client.id", “test-flink"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties));
kafkaStream.addSink(new StringLogSink()); env.execute();
}
There are messages being sent to Kafka on that topic, I just never see anything in Flink. Any help/insight you could provide would be greatly appreciated. If it makes a difference this is running on YARN. Also, here’s what I see in the logs:
2016-04-26 18:02:38,707 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.1 2016-04-26 18:02:38,707 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06 2016-04-26 18:02:38,708 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Trying to get partitions for topic test 2016-04-26 18:02:38,854 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Got 1 partitions from these topics: [test] 2016-04-26 18:02:38,854 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer is going to read the following topics (with number of partitions): test (1), 2016-04-26 18:02:38,933 INFO org.apache.flink.yarn.YarnJobManager - Submitting job 0ab4248d8917e707a8f297420e4c564d (). 2016-04-26 18:02:38,934 INFO org.apache.flink.yarn.YarnJobManager - Using restart strategy NoRestartStrategy for 0ab4248d8917e707a8f297420e4c564d. 2016-04-26 18:02:38,935 INFO org.apache.flink.yarn.YarnJobManager - Scheduling job 0ab4248d8917e707a8f297420e4c564d (). 2016-04-26 18:02:38,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from CREATED to SCHEDULED 2016-04-26 18:02:38,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from SCHEDULED to DEPLOYING 2016-04-26 18:02:38,935 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231 2016-04-26 18:02:38,936 INFO org.apache.flink.yarn.YarnJobManager - Status of job 0ab4248d8917e707a8f297420e4c564d () changed to RUNNING. 2016-04-26 18:02:39,151 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from DEPLOYING to RUNNING Thanks,
Josh
|
Hi,
You can check if any messages are going through dataflow on flink web dashboard Dominik Choma
|
Hi,
the web interface is a good idea for checking if everything is working as expected. However in this case I expect the counts for the task be 0 because the source and sink are chained together into one task (upcoming Flink releases will fix this behavior). I'm asking to make sure you're looking at the right place to capture the output. It will be at the YARN containers. Regards, Robert On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <[hidden email]> wrote:
|
“StringLogSink” just looks like:
System.out.println(msg); LOG.info("Logging message: " + msg);
And LOG is from slf4j. In the Flink UI that is running on Yarn, I see no counts, nor log statements or stdout under JobManager. It seems to make no difference if I submit the job through yarn via command line or the Flink UI session already running under yarn. Where would you recommend I look in the Yarn containers?
Thanks again for your help.
Josh
From: Robert Metzger <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]> Date: Tuesday, April 26, 2016 at 3:30 PM To: "[hidden email]" <[hidden email]> Subject: [External] Re: Consuming Messages from Kafka Hi,
the web interface is a good idea for checking if everything is working as expected. However in this case I expect the counts for the task be 0 because the source and sink are chained together into one task (upcoming Flink releases will fix this behavior).
I'm asking to make sure you're looking at the right place to capture the output. It will be at the YARN containers.
Regards,
Robert
On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma
<[hidden email]> wrote:
|
Hi Josh, The JobManager log won't contain this output. Check out these slides I did a while ago, they explain how you can retrieve the logs from the TaskManagers: http://www.slideshare.net/robertmetzger1/apache-flink-hands-on#14 On Tue, Apr 26, 2016 at 9:41 PM, Conlin, Joshua [USA] <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |