Not able to read Kafka Messages with FlinkKafkaConsumer010

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Not able to read Kafka Messages with FlinkKafkaConsumer010

Sridhar Chellappa
I am pretty sure I am doing something wrong here. Just that I do not understand why?

I wrote a small program that reads messages from Kafka and prints it out.



public class Main {

private static final int CHECKPOINT_INTERVAL = 100000;


private static Properties getpropsFromEnv() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
return props;
}

public static void main(String[] args) throws Exception {


Properties props = getpropsFromEnv();


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(CHECKPOINT_INTERVAL);
env.setParallelism(1);
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties()
);

DataStream<LogMessage> logMessageDataStream = env.addSource(flinkConsumer);
logMessageDataStream.print();

env.execute("SomeJob");

}
}

public class LogDeserializationSchema implements DeserializationSchema<LogMessage> {

@Override
public LogMessage deserialize(byte[] message) {
LogMessage logMessage = null;
try {
logMessage = LogMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return logMessage;
}
}

@Override
public boolean isEndOfStream(LogMessage nextElement) {
return false;
}

@Override
public TypeInformation<LogMessage> getProducedType() {
return TypeExtractor.getForClass(LogMessage.class);
}
}


When I run this program, I do not see any messages being read by the consumer. 

Things to note :

1. I ran kafka-console-consumer using the same Kafka parameters and saw continuous output.
2. My Gradle file has the following depencies :

dependencies {
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'


compile 'org.aeonbits.owner:owner:1.0.9'
compile group: 'com.mashape.unirest', name: 'unirest-java', version: '1.4.9' // For driver Suspension
compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
compile 'com.google.protobuf:protobuf-java-util:3.1.0'

/*
* Flink Dependencies
*/
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.3.0'




}

Can Someone please help ?
Reply | Threaded
Open this post in threaded view
|

Re: Not able to read Kafka Messages with FlinkKafkaConsumer010

mmziyad
Hi Sridhar

Are you using ParameterTool to set the properties? I couldn't see it in your code, but you use it in the below line:
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties());

Make sure that the correct properties are passed to FlinkKafkaConsumer.

Best
Ziyad


Best Regards
Ziyad Muhammed Mohiyudheen 
407, Internationales Studienzentrum Berlin
Theodor-Heuss-Platz 5
14052 Berlin 
Ph: <a href="tel:%2B49%20176%206587%203343" value="+4917665873343" style="color:rgb(17,85,204)" target="_blank">+49 176 6587 3343
Mail to: [hidden email]

On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <[hidden email]> wrote:
I am pretty sure I am doing something wrong here. Just that I do not understand why?

I wrote a small program that reads messages from Kafka and prints it out.



public class Main {

private static final int CHECKPOINT_INTERVAL = 100000;


private static Properties getpropsFromEnv() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
return props;
}

public static void main(String[] args) throws Exception {


Properties props = getpropsFromEnv();


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(CHECKPOINT_INTERVAL);
env.setParallelism(1);
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties()
);

DataStream<LogMessage> logMessageDataStream = env.addSource(flinkConsumer);
logMessageDataStream.print();

env.execute("SomeJob");

}
}

public class LogDeserializationSchema implements DeserializationSchema<LogMessage> {

@Override
public LogMessage deserialize(byte[] message) {
LogMessage logMessage = null;
try {
logMessage = LogMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return logMessage;
}
}

@Override
public boolean isEndOfStream(LogMessage nextElement) {
return false;
}

@Override
public TypeInformation<LogMessage> getProducedType() {
return TypeExtractor.getForClass(LogMessage.class);
}
}


When I run this program, I do not see any messages being read by the consumer. 

Things to note :

1. I ran kafka-console-consumer using the same Kafka parameters and saw continuous output.
2. My Gradle file has the following depencies :

dependencies {
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'


compile 'org.aeonbits.owner:owner:1.0.9'
compile group: 'com.mashape.unirest', name: 'unirest-java', version: '1.4.9' // For driver Suspension
compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
compile 'com.google.protobuf:protobuf-java-util:3.1.0'

/*
* Flink Dependencies
*/
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.3.0'




}

Can Someone please help ?

Reply | Threaded
Open this post in threaded view
|

Re: Not able to read Kafka Messages with FlinkKafkaConsumer010

Sridhar Chellappa
Thanks Ziyad. That was a cut and paste error. Anyway, I figured out a solution to the issue. All of my Flink dependancies were pointing at 1.3.1. Pointing at 1.3.0 resolved the issue.

On Wed, Jul 12, 2017 at 2:17 AM, Ziyad Muhammed <[hidden email]> wrote:
Hi Sridhar

Are you using ParameterTool to set the properties? I couldn't see it in your code, but you use it in the below line:
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties());

Make sure that the correct properties are passed to FlinkKafkaConsumer.

Best
Ziyad


Best Regards
Ziyad Muhammed Mohiyudheen 
407, Internationales Studienzentrum Berlin
Theodor-Heuss-Platz 5
14052 Berlin 
Ph: <a href="tel:%2B49%20176%206587%203343" value="+4917665873343" style="color:rgb(17,85,204)" target="_blank">+49 176 6587 3343
Mail to: [hidden email]

On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <[hidden email]> wrote:
I am pretty sure I am doing something wrong here. Just that I do not understand why?

I wrote a small program that reads messages from Kafka and prints it out.



public class Main {

private static final int CHECKPOINT_INTERVAL = 100000;


private static Properties getpropsFromEnv() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
return props;
}

public static void main(String[] args) throws Exception {


Properties props = getpropsFromEnv();


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(CHECKPOINT_INTERVAL);
env.setParallelism(1);
FlinkKafkaConsumer010<LogMessage> flinkConsumer =
new FlinkKafkaConsumer010<LogMessage>(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties()
);

DataStream<LogMessage> logMessageDataStream = env.addSource(flinkConsumer);
logMessageDataStream.print();

env.execute("SomeJob");

}
}

public class LogDeserializationSchema implements DeserializationSchema<LogMessage> {

@Override
public LogMessage deserialize(byte[] message) {
LogMessage logMessage = null;
try {
logMessage = LogMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return logMessage;
}
}

@Override
public boolean isEndOfStream(LogMessage nextElement) {
return false;
}

@Override
public TypeInformation<LogMessage> getProducedType() {
return TypeExtractor.getForClass(LogMessage.class);
}
}


When I run this program, I do not see any messages being read by the consumer. 

Things to note :

1. I ran kafka-console-consumer using the same Kafka parameters and saw continuous output.
2. My Gradle file has the following depencies :

dependencies {
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'


compile 'org.aeonbits.owner:owner:1.0.9'
compile group: 'com.mashape.unirest', name: 'unirest-java', version: '1.4.9' // For driver Suspension
compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
compile 'com.google.protobuf:protobuf-java-util:3.1.0'

/*
* Flink Dependencies
*/
compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.3.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.3.0'




}

Can Someone please help ?