I have this code trying to read from a topic however the flink process comes up and waits forever even though there is data in the topic. Not sure why? Has anyone else seen this problem? StreamExecutionEnvironment env = StreamExecutionEnvironment .createLocalEnvironment();
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxxx:9092"); properties.setProperty("group.id", "test1"); properties.setProperty("auto.offset.reset", "earliest"); FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something
DataStream<String> stream = env .addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties));
stream.map(s -> Integer.valueOf(s)).flatMap(flatMapper).returns( new TypeHint<Tuple2<Integer, Integer>>() { }).print();
JobExecutionResult res = env.execute(); 02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(4/4) switched to RUNNING
02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(1/4) switched to RUNNING
02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink: Unnamed(2/4) switched to RUNNING |
Interestingly enough same job runs ok on Linux but not on windows On Fri, Feb 17, 2017 at 4:54 PM, Mohit Anchlia <[hidden email]> wrote:
|
Hi Mohit, is there new data being produced into the topic? The properties.setProperty("auto. So if you have read all the data in the topic before, you won't see anything new showing up. On Sat, Feb 18, 2017 at 2:09 AM, Mohit Anchlia <[hidden email]> wrote:
|
I was facing a similar problem yesterday. In my case print() was not working. Try adding a Sink and write the output to another Kafka topic. Something like https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java#L71 .. It worked for me. Is the stdout disabled somehow by default ? regards. On Thu, Feb 23, 2017 at 9:42 PM, Robert Metzger <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Hi Debashish, did you execute Flink in a distributed setting? print() will output the stream contents on stdout on the respective worker node (taskmanager), not on the machine that submitted the job. On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh <[hidden email]> wrote:
|
Yes .. I was running Flink on a DC/OS cluster. AFAIR I checked the taskmanager log from the Flink UI in Mesos. It said stdout was not available. But this may be due to the fact that Flink on DC/OS is not yet very stable .. regards. On Fri, Feb 24, 2017 at 1:41 AM, Robert Metzger <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Hi, It is possible that the stdout file is not properly available in the taskmanager UI. I guess if you log into the machine directly to get the stout file, you'll find the output. On Thu, Feb 23, 2017 at 9:24 PM, Debasish Ghosh <[hidden email]> wrote:
|
May be .. I will try to log in to the machine directly and see .. regards. On Fri, Feb 24, 2017 at 2:05 AM, Robert Metzger <[hidden email]> wrote:
Debasish Ghosh http://manning.com/ghosh2 |
Free forum by Nabble | Edit this page |