Consuming Messages from Kafka

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Consuming Messages from Kafka

Conlin, Joshua [USA]
Hello,

I am new to Flink and trying to learn this framework.  Seems great so far.  I am trying to translate my existing storm Topology to a Flink job and I am having trouble consuming data from Kafka.  Here’s what my Job looks like:

public static void main(String[] argsthrows Exception {

Properties properties = new Properties();

properties.setProperty("bootstrap.servers"“hostname:port”);


properties.setProperty("group.id"“stream-test");

properties.setProperty("client.id"“test-flink");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer09<>("test"new SimpleStringSchema(), properties));


kafkaStream.addSink(new StringLogSink());

env.execute();


}


There are messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here’s what I see in the logs:


2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.9.0.1
2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 23c69d62a0cabf06
2016-04-26 18:02:38,708 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Trying to get partitions for topic test
2016-04-26 18:02:38,854 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 1 partitions from these topics: [test]
2016-04-26 18:02:38,854 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer is going to read the following topics (with number of partitions): test (1), 
2016-04-26 18:02:38,933 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,934 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy NoRestartStrategy for 0ab4248d8917e707a8f297420e4c564d.
2016-04-26 18:02:38,935 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,935 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from CREATED to SCHEDULED
2016-04-26 18:02:38,935 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from SCHEDULED to DEPLOYING
2016-04-26 18:02:38,935 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231
2016-04-26 18:02:38,936 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 0ab4248d8917e707a8f297420e4c564d () changed to RUNNING.
2016-04-26 18:02:39,151 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from DEPLOYING to RUNNING

Thanks,

Josh
Reply | Threaded
Open this post in threaded view
|

Re: Consuming Messages from Kafka

Dominik Choma
Hi,

You can check if any messages are going through dataflow on flink web dashboard



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] <[hidden email]> w dniu 26 kwi 2016, o godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here’s what I see in the logs:

Reply | Threaded
Open this post in threaded view
|

Re: Consuming Messages from Kafka

rmetzger0
Hi,

the web interface is a good idea for checking if everything is working as expected. However in this case I expect the counts for the task be 0 because the source and sink are chained together into one task (upcoming Flink releases will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <[hidden email]> wrote:
Hi,

You can check if any messages are going through dataflow on flink web dashboard



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] <[hidden email]> w dniu 26 kwi 2016, o godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here’s what I see in the logs:


Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: Consuming Messages from Kafka

Conlin, Joshua [USA]
“StringLogSink” just looks like:

System.out.println(msg);

LOG.info("Logging message: " + msg);


And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no counts, nor log statements or stdout under JobManager.  It seems to make no difference if I submit the job through yarn via command line or the Flink UI session already running under yarn.  Where would you recommend I look in the Yarn containers?


Thanks again for your help.


Josh


From: Robert Metzger <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, April 26, 2016 at 3:30 PM
To: "[hidden email]" <[hidden email]>
Subject: [External] Re: Consuming Messages from Kafka

Hi,

the web interface is a good idea for checking if everything is working as expected. However in this case I expect the counts for the task be 0 because the source and sink are chained together into one task (upcoming Flink releases will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <[hidden email]> wrote:
Hi,

You can check if any messages are going through dataflow on flink web dashboard



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] <[hidden email]> w dniu 26 kwi 2016, o godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here’s what I see in the logs:


Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: Consuming Messages from Kafka

rmetzger0
Hi Josh,

The JobManager log won't contain this output.

Check out these slides I did a while ago, they explain how you can retrieve the logs from the TaskManagers: http://www.slideshare.net/robertmetzger1/apache-flink-hands-on#14



On Tue, Apr 26, 2016 at 9:41 PM, Conlin, Joshua [USA] <[hidden email]> wrote:
“StringLogSink” just looks like:

System.out.println(msg);

LOG.info("Logging message: " + msg);


And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no counts, nor log statements or stdout under JobManager.  It seems to make no difference if I submit the job through yarn via command line or the Flink UI session already running under yarn.  Where would you recommend I look in the Yarn containers?


Thanks again for your help.


Josh


From: Robert Metzger <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, April 26, 2016 at 3:30 PM
To: "[hidden email]" <[hidden email]>
Subject: [External] Re: Consuming Messages from Kafka

Hi,

the web interface is a good idea for checking if everything is working as expected. However in this case I expect the counts for the task be 0 because the source and sink are chained together into one task (upcoming Flink releases will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <[hidden email]> wrote:
Hi,

You can check if any messages are going through dataflow on flink web dashboard



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] <[hidden email]> w dniu 26 kwi 2016, o godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here’s what I see in the logs: