Flink not reading from Kafka

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink not reading from Kafka

Mohit Anchlia
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING


Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

Mohit Anchlia
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING



Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

rmetzger0
Hi Mohit,

is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset""earliest"); setting only applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see anything new showing up.

On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING




Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

Debasish Ghosh
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 ..

It worked for me. Is the stdout disabled somehow by default ?

regards.

On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <[hidden email]> wrote:
Hi Mohit,

is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset""earliest"); setting only applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see anything new showing up.

On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING







--
Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

rmetzger0
Hi Debashish,

did you execute Flink in a distributed setting? print() will output the stream contents on stdout on the respective worker node (taskmanager), not on the machine that submitted the job.

On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh <[hidden email]> wrote:
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 ..

It worked for me. Is the stdout disabled somehow by default ?

regards.

On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <[hidden email]> wrote:
Hi Mohit,

is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset""earliest"); setting only applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see anything new showing up.

On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING







--

Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

Debasish Ghosh
Yes .. I was running Flink on a DC/OS cluster. 

AFAIR I checked the taskmanager log from the Flink UI in Mesos. It said stdout was not available. But this may be due to the fact that Flink on DC/OS is not yet very stable .. 

regards.

On Fri, Feb 24, 2017 at 1:41 AM, Robert Metzger <[hidden email]> wrote:
Hi Debashish,

did you execute Flink in a distributed setting? print() will output the stream contents on stdout on the respective worker node (taskmanager), not on the machine that submitted the job.

On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh <[hidden email]> wrote:
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 ..

It worked for me. Is the stdout disabled somehow by default ?

regards.

On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <[hidden email]> wrote:
Hi Mohit,

is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset""earliest"); setting only applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see anything new showing up.

On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING







--




--
Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

rmetzger0
Hi,

It is possible that the stdout file is not properly available in the taskmanager UI.
I guess if you log into the machine directly to get the stout file, you'll find the output.

On Thu, Feb 23, 2017 at 9:24 PM, Debasish Ghosh <[hidden email]> wrote:
Yes .. I was running Flink on a DC/OS cluster. 

AFAIR I checked the taskmanager log from the Flink UI in Mesos. It said stdout was not available. But this may be due to the fact that Flink on DC/OS is not yet very stable .. 

regards.

On Fri, Feb 24, 2017 at 1:41 AM, Robert Metzger <[hidden email]> wrote:
Hi Debashish,

did you execute Flink in a distributed setting? print() will output the stream contents on stdout on the respective worker node (taskmanager), not on the machine that submitted the job.

On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh <[hidden email]> wrote:
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 ..

It worked for me. Is the stdout disabled somehow by default ?

regards.

On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <[hidden email]> wrote:
Hi Mohit,

is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset""earliest"); setting only applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see anything new showing up.

On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING







--




--

Reply | Threaded
Open this post in threaded view
|

Re: Flink not reading from Kafka

Debasish Ghosh
May be .. I will try to log in to the machine directly and see ..

regards.

On Fri, Feb 24, 2017 at 2:05 AM, Robert Metzger <[hidden email]> wrote:
Hi,

It is possible that the stdout file is not properly available in the taskmanager UI.
I guess if you log into the machine directly to get the stout file, you'll find the output.

On Thu, Feb 23, 2017 at 9:24 PM, Debasish Ghosh <[hidden email]> wrote:
Yes .. I was running Flink on a DC/OS cluster. 

AFAIR I checked the taskmanager log from the Flink UI in Mesos. It said stdout was not available. But this may be due to the fact that Flink on DC/OS is not yet very stable .. 

regards.

On Fri, Feb 24, 2017 at 1:41 AM, Robert Metzger <[hidden email]> wrote:
Hi Debashish,

did you execute Flink in a distributed setting? print() will output the stream contents on stdout on the respective worker node (taskmanager), not on the machine that submitted the job.

On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh <[hidden email]> wrote:
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 ..

It worked for me. Is the stdout disabled somehow by default ?

regards.

On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <[hidden email]> wrote:
Hi Mohit,

is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset""earliest"); setting only applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see anything new showing up.

On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
Interestingly enough same job runs ok on Linux but not on windows

On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.createLocalEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



 

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));

stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns(

new TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult res = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING







--




--




--