Below is my setup
1. Kafka zookeeper and server in one machine (192.168.1.116) and producer (192.168.1.100) and consumer (192.168.1.117) in another machine. --> This work fine no issue 2. Running standalone beam application with kafka consumer --> This work fine 3. Running beam application in flink cluster with kafka consumer --> This is not working Not receiving message from kafka producer. Same program works fine with standalone with flink runner. Below is my code snippet. public static void main(String[] args) { Pipeline p = initializePipeline(args); Map<String, List<String>> intelliOmIms = new TreeMap<>(); PTransform<PBegin, PCollection<KV<Integer, byte[]>>> reader; reader = KafkaIO.<Integer, byte[]>read() .withBootstrapServers("192.168.1.116:9092") --->Kafka zookeeper and server running .withTopic("kafkatest") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(IntelliOmImsKpiDataUtil.class) .withoutMetadata(); PCollection<KV<Integer, byte[]>> output = p.apply(reader); output.apply(ParDo.of(new PrintMsg())); p.run().waitUntilFinish(); } In IntelliOmImsKpiDataUtil deserializer I am just printing message saying that kafka is received the message. public static class PrintMsg extends DoFn<KV<Integer, byte[]>, Void> { @ProcessElement public void processElement(ProcessContext c) { System.out.println("Received Message .... from kafkatest Topic "); } } Started Zookeeper in 192.168.1.116 like below : bin/zookeeper-server-start.sh config/zookeeper.properties Started Server in 192.168.1.116 like below : bin/kafka-server-start.sh config/server.properties Started Producer in 192.168.1.100 like below : bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic kafkatest Started Consumer in 192.168.1.117 like below : bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic kafkatest --from-beginning With standalone beam application kafka can receive the message, But in cluster setup it is not working. Can you please help me to check it. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
What do you mean by: > With standalone beam application kafka can receive the message, But in cluster setup it is not working. In your example you are reading the data from Kafka and printing them to console. There doesn’t seems to be anything that writes back to Kafka, so what do you mean by “Kafka can not receive the message”? Did you check the output file of your application in the log directory? Did you check Flink logs if there are any errors? Piotrek > On 12 Oct 2017, at 15:49, Shankara <[hidden email]> wrote: > > Below is my setup > 1. Kafka zookeeper and server in one machine (192.168.1.116) and > producer (192.168.1.100) and consumer (192.168.1.117) in another machine. > --> This work fine no issue > 2. Running standalone beam application with kafka consumer --> This > work fine > 3. Running beam application in flink cluster with kafka consumer --> > This is not working > Not receiving message from kafka producer. > > Same program works fine with standalone with flink runner. > Below is my code snippet. > > public static void main(String[] args) { > Pipeline p = initializePipeline(args); > Map<String, List<String>> intelliOmIms = new TreeMap<>(); > > PTransform<PBegin, PCollection<KV<Integer, byte[]>>> reader; > reader = KafkaIO.<Integer, byte[]>read() > .withBootstrapServers("192.168.1.116:9092") --->Kafka > zookeeper and server running > .withTopic("kafkatest") > .withKeyDeserializer(IntegerDeserializer.class) > .withValueDeserializer(IntelliOmImsKpiDataUtil.class) > .withoutMetadata(); > > PCollection<KV<Integer, byte[]>> output = p.apply(reader); > output.apply(ParDo.of(new PrintMsg())); > > p.run().waitUntilFinish(); > } > > In IntelliOmImsKpiDataUtil deserializer I am just printing message saying > that kafka is received the message. > > public static class PrintMsg extends DoFn<KV<Integer, byte[]>, Void> { > > @ProcessElement > public void processElement(ProcessContext c) { > System.out.println("Received Message .... from kafkatest Topic "); > } > } > > Started Zookeeper in 192.168.1.116 like below : > bin/zookeeper-server-start.sh config/zookeeper.properties > > Started Server in 192.168.1.116 like below : > bin/kafka-server-start.sh config/server.properties > > Started Producer in 192.168.1.100 like below : > bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic > kafkatest > > Started Consumer in 192.168.1.117 like below : > bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic > kafkatest --from-beginning > > With standalone beam application kafka can receive the message, But in > cluster setup it is not working. > > Can you please help me to check it. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I mean same code works fine in flink local setup. I can able to see "Received Message .... from testkafka Topic : " on console when kafka receive some message (Kafka Producer is in other machine and sending some message frequently to testkafka topic). *Submitted the Beam application to flink local by below command :* mvn compile exec:java -Dexec.mainClass=org.apache.beam.influxdb.KafkaRead -Pflink-runner *Output is :* Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608] with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e. 10/13/2017 11:09:09 Job execution switched to status RUNNING. 10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map -> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map -> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map -> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING *Received in Deserilize.. Received Message .... from testkafka Topic : HELLOASA* If I run same code in Flink Cluster I cannot see any message in log/stdout, But job is continuously running and Kafka Producer is in other machine and sending some message frequently to testkafka topic. * I started flink cluster by below command : * bin/start-cluster.sh *Submitted the Beam application to flink cluster by below command :* bin/flink run -c org.apache.beam.influxdb.KafkaRead /home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar --runner=FlinkRunner --flinkMaster=192.168.1.116 --filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar In dashboad : <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/DashBoard.png> I cannot see any message in dashboard : <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/Stdout.png> As per log Job execution is running : Cluster configuration: Standalone cluster with JobManager at /192.168.1.116:6123 Using address 192.168.1.116:6123 to connect to JobManager. JobManager web interface address http://192.168.1.116:8081 Starting execution of program Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with leader session id 00000000-0000-0000-0000-000000000000. 10/13/2017 11:10:57 Job execution switched to status RUNNING. 10/13/2017 11:10:57 Source: Read(UnboundedKafkaSource) -> Flat Map -> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 10/13/2017 11:10:57 Source: Read(UnboundedKafkaSource) -> Flat Map -> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 10/13/2017 11:11:05 Source: Read(UnboundedKafkaSource) -> Flat Map -> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING There is no exception in log. I suspect deployment of kafka having issue. Can you please help me to check it. public static void main(String[] args) { Pipeline p = initializePipeline(args); Map<String, List&lt;String>> intelliOmIms = new TreeMap<>(); PTransform<PBegin, PCollection&lt;KV&lt;Integer, byte[]>>> reader; reader = KafkaIO.<Integer, byte[]>read() .withBootstrapServers("192.168.1.116:9092") --->Kafka zookeeper and server running .withTopic("kafkatest") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(IntelliOmImsKpiDataUtil.class) .withoutMetadata(); PCollection<KV&lt;Integer, byte[]>> output = p.apply(reader); output.apply(ParDo.of(new PrintMsg())); p.run().waitUntilFinish(); } public static class PrintMsg extends DoFn<KV<Integer, byte[]>, Void> { @ProcessElement public void processElement(ProcessContext c) { try { System.out.println("Received Message .... from testkafka Topic : " + new String(c.element().getValue(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
What version of Flink are you using. In earlier 1.3.x releases there were some bugs in Kafka Consumer code. Could you change the log level in Flink to debug? Did you check the Kafka logs for some hint maybe? I guess that metrics like bytes read/input records of this Link application are not changing? Piotrek > On 13 Oct 2017, at 07:51, Shankara <[hidden email]> wrote: > > Hi, > > I mean same code works fine in flink local setup. I can able to see > "Received Message .... from testkafka Topic : " on console when kafka > receive some message (Kafka Producer is in other machine and sending some > message frequently to testkafka topic). > > *Submitted the Beam application to flink local by below command :* > mvn compile exec:java > -Dexec.mainClass=org.apache.beam.influxdb.KafkaRead -Pflink-runner > > *Output is :* > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608] > with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e. > 10/13/2017 11:09:09 Job execution switched to status RUNNING. > 10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map -> > ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED > 10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map -> > ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING > 10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map -> > ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING > *Received in Deserilize.. > Received Message .... from testkafka Topic : HELLOASA* > > > > If I run same code in Flink Cluster I cannot see any message in > log/stdout, But job is continuously running and Kafka Producer is in other > machine and sending some message frequently to testkafka topic. > > * I started flink cluster by below command : * > bin/start-cluster.sh > > *Submitted the Beam application to flink cluster by below command :* > bin/flink run -c org.apache.beam.influxdb.KafkaRead > /home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar > --runner=FlinkRunner --flinkMaster=192.168.1.116 > --filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar > > In dashboad : > > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/DashBoard.png> > > > > I cannot see any message in dashboard : > > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/Stdout.png> > > > As per log Job execution is running : > Cluster configuration: Standalone cluster with JobManager at > /192.168.1.116:6123 > Using address 192.168.1.116:6123 to connect to JobManager. > JobManager web interface address http://192.168.1.116:8081 > Starting execution of program > Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with > leader session id 00000000-0000-0000-0000-000000000000. > 10/13/2017 11:10:57 Job execution switched to status RUNNING. > 10/13/2017 11:10:57 Source: Read(UnboundedKafkaSource) -> Flat Map -> > ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED > 10/13/2017 11:10:57 Source: Read(UnboundedKafkaSource) -> Flat Map -> > ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING > 10/13/2017 11:11:05 Source: Read(UnboundedKafkaSource) -> Flat Map -> > ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING > > There is no exception in log. I suspect deployment of kafka having issue. > > Can you please help me to check it. > > > > > public static void main(String[] args) { > Pipeline p = initializePipeline(args); > Map<String, List&lt;String>> intelliOmIms = new TreeMap<>(); > > PTransform<PBegin, PCollection&lt;KV&lt;Integer, byte[]>>> > reader; > reader = KafkaIO.<Integer, byte[]>read() > .withBootstrapServers("192.168.1.116:9092") --->Kafka > zookeeper and server running > .withTopic("kafkatest") > .withKeyDeserializer(IntegerDeserializer.class) > .withValueDeserializer(IntelliOmImsKpiDataUtil.class) > .withoutMetadata(); > > PCollection<KV&lt;Integer, byte[]>> output = p.apply(reader); > output.apply(ParDo.of(new PrintMsg())); > > p.run().waitUntilFinish(); > } > > public static class PrintMsg extends DoFn<KV<Integer, byte[]>, Void> { > > @ProcessElement > public void processElement(ProcessContext c) { > > try { > System.out.println("Received Message .... from testkafka > Topic : " + new String(c.element().getValue(), "UTF-8")); > } catch (UnsupportedEncodingException e) { > e.printStackTrace(); > } > } > } > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Piotrek,
I was checking in Job manager machine logs, and dashboard. But actually output string was recorded in taskmanager macine log file. I added InfluxDB and verified, Received data is writing into influxDB. Thank you very much for your support. Thanks, Shankara -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |