map is not executed when using 'bin/flink run'

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

map is not executed when using 'bin/flink run'

sandeep6
Hi there,

I have the following code.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<>(
topic,
new MsgPackDeserializer(),
props)); // The Properties will be passed to the KafkaConsumer


messageStream
.rebalance()
.filter(s->s.contains("XYZ"))
.map(s -> s + " :" + new Date())
.print();

This works fine when I run the jar individually. eg. java -jar Myjar.jar
But when I start flink with local mode (bin/start-local.sh) and run the jar using bin/flink run Myjar.jar the map part doesn't seem to ru
Inline image 3Inline image 4

It received messages from kafka but it din't print them.

I'm using following dependencies-
compile 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.1.3'
compile 'org.apache.flink:flink-streaming-java_2.10:1.1.3'

Reply | Threaded
Open this post in threaded view
|

Re: map is not executed when using 'bin/flink run'

Till Rohrmann
Hi,

have you checked the taskmanager.out file in the logs directory? This file should contain the stdout output.

Cheers,
Till

On Mon, Nov 14, 2016 at 5:19 PM, Meghashyam Sandeep V <[hidden email]> wrote:
Hi there,

I have the following code.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<>(
topic,
new MsgPackDeserializer(),
props)); // The Properties will be passed to the KafkaConsumer


messageStream
.rebalance()
.filter(s->s.contains("XYZ"))
.map(s -> s + " :" + new Date())
.print();

This works fine when I run the jar individually. eg. java -jar Myjar.jar
But when I start flink with local mode (bin/start-local.sh) and run the jar using bin/flink run Myjar.jar the map part doesn't seem to ru
Inline image 3Inline image 4

It received messages from kafka but it din't print them.

I'm using following dependencies-
compile 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.1.3'
compile 'org.apache.flink:flink-streaming-java_2.10:1.1.3'