Getting java.lang.Exception when try to fetch data from Kafka

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting java.lang.Exception when try to fetch data from Kafka

prateekarora

Hi

I am sending data using kafkaProducer API

                       imageRecord = new ProducerRecord<String, byte[]>(topic,messageKey, imageData);
                        producer.send(imageRecord);


And in flink program  try to fect data using FlinkKafkaConsumer08 . below are the sample code .

    def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "<IPADDRESS>:9092")
          properties.setProperty("zookeeper.connect", "<IPADDRESS>:2181")
          properties.setProperty("group.id", "test")

          val readSchema = new TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]], env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]

          val stream : DataStream[(String,Array[Byte])]  = env.addSource(new FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))

          stream.print
          env.execute("Flink Kafka Example")
  }


but getting  below error :

16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to FAILED
16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink: Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to CANCELING

java.lang.Exception
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
    at org.apache.flink.types.StringValue.readString(StringValue.java:771)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
    at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
    at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
    at org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)


Regards
Prateek
Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

rmetzger0
Hi Prateek,

were the messages written to the Kafka topic by Flink, using the TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink deserializers expect a different data format of the messages in the topic.

How are the messages written into the topic?


On Fri, Apr 22, 2016 at 10:21 PM, prateekarora <[hidden email]> wrote:

Hi

I am sending data using kafkaProducer API

                       imageRecord = new ProducerRecord<String,
byte[]>(topic,messageKey, imageData);
                        producer.send(imageRecord);


And in flink program  try to fect data using FlinkKafkaConsumer08 . below
are the sample code .

    def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "<IPADDRESS>:9092")
          properties.setProperty("zookeeper.connect", "<IPADDRESS>:2181")
          properties.setProperty("group.id", "test")

          val readSchema = new
TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]

          val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))

          stream.print
          env.execute("Flink Kafka Example")
  }


but getting  below error :

16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
FAILED
16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
CANCELING

java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
    at org.apache.flink.types.StringValue.readString(StringValue.java:771)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)


Regards
Prateek




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

prateekarora
Hi

I have java program  that sending data into kafka topic using kafa client API (0.8.2)

here is sample to code using to send data in kafka topic :

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

ProducerRecord<String, byte[]> imageRecord;
imageRecord = new ProducerRecord<String, byte[]>(streamInfo.topic,
                        Integer.toString(messageKey), imageData);

producer.send(imageRecord);


Regrads
Prateek
Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

prateekarora
Hi

I have java program to send data into kafka topic. below is code for this :

private Producer<String, byte[]> producer = null

Serializer<String> keySerializer = new StringSerializer();
Serializer<byte[]> valueSerializer = new ByteArraySerializer();
producer = new KafkaProducer<String, byte[]>(props, keySerializer, valueSerializer);

ProducerRecord<String, byte[]> imageRecord;
imageRecord = new ProducerRecord<String, byte[]>(streamInfo.topic,
                        Integer.toString(messageKey), imageData);

producer.send(imageRecord);


then trying to fetch data in Apache flink .

Regards
Prateek
Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

prateekarora
In reply to this post by rmetzger0
Hi Robert ,

Hi 

I have java program to send data into kafka topic. below is code for this : 

private Producer<String, byte[]> producer = null 

Serializer<String> keySerializer = new StringSerializer(); 
Serializer<byte[]> valueSerializer = new ByteArraySerializer(); 
producer = new KafkaProducer<String, byte[]>(props, keySerializer, valueSerializer); 

ProducerRecord<String, byte[]> imageRecord; 
imageRecord = new ProducerRecord<String, byte[]>(streamInfo.topic, 
                        Integer.toString(messageKey), imageData); 

producer.send(imageRecord); 


then trying to fetch data in Apache flink . 

Regards 
Prateek 

On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger <[hidden email]> wrote:
Hi Prateek,

were the messages written to the Kafka topic by Flink, using the TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink deserializers expect a different data format of the messages in the topic.

How are the messages written into the topic?


On Fri, Apr 22, 2016 at 10:21 PM, prateekarora <[hidden email]> wrote:

Hi

I am sending data using kafkaProducer API

                       imageRecord = new ProducerRecord<String,
byte[]>(topic,messageKey, imageData);
                        producer.send(imageRecord);


And in flink program  try to fect data using FlinkKafkaConsumer08 . below
are the sample code .

    def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "<IPADDRESS>:9092")
          properties.setProperty("zookeeper.connect", "<IPADDRESS>:2181")
          properties.setProperty("group.id", "test")

          val readSchema = new
TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]

          val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))

          stream.print
          env.execute("Flink Kafka Example")
  }


but getting  below error :

16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
FAILED
16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
CANCELING

java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
    at org.apache.flink.types.StringValue.readString(StringValue.java:771)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)


Regards
Prateek




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

rmetzger0
Hi Prateek,

sorry for the late response. Can you try implementing your own DeserializationSchema, where you deserialize the String key manually (just call the "new String(byte[]) constructor).

The TypeInformationKeyValueSerializationSchema[String, byte] is generating deserializers with Flink's internal serializer stack (these assume that the data has been serialized by Flink as well). I think Flink's StringSerializer does some fancy optimizations and is not compatible with the standard String() format.



On Tue, Apr 26, 2016 at 6:34 PM, prateek arora <[hidden email]> wrote:
Hi Robert ,

Hi 

I have java program to send data into kafka topic. below is code for this : 

private Producer<String, byte[]> producer = null 

Serializer<String> keySerializer = new StringSerializer(); 
Serializer<byte[]> valueSerializer = new ByteArraySerializer(); 
producer = new KafkaProducer<String, byte[]>(props, keySerializer, valueSerializer); 

ProducerRecord<String, byte[]> imageRecord; 
imageRecord = new ProducerRecord<String, byte[]>(streamInfo.topic, 
                        Integer.toString(messageKey), imageData); 

producer.send(imageRecord); 


then trying to fetch data in Apache flink . 

Regards 
Prateek 

On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger <[hidden email]> wrote:
Hi Prateek,

were the messages written to the Kafka topic by Flink, using the TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink deserializers expect a different data format of the messages in the topic.

How are the messages written into the topic?


On Fri, Apr 22, 2016 at 10:21 PM, prateekarora <[hidden email]> wrote:

Hi

I am sending data using kafkaProducer API

                       imageRecord = new ProducerRecord<String,
byte[]>(topic,messageKey, imageData);
                        producer.send(imageRecord);


And in flink program  try to fect data using FlinkKafkaConsumer08 . below
are the sample code .

    def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "<IPADDRESS>:9092")
          properties.setProperty("zookeeper.connect", "<IPADDRESS>:2181")
          properties.setProperty("group.id", "test")

          val readSchema = new
TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]

          val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))

          stream.print
          env.execute("Flink Kafka Example")
  }


but getting  below error :

16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
FAILED
16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
CANCELING

java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
    at org.apache.flink.types.StringValue.readString(StringValue.java:771)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)


Regards
Prateek




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

prateekarora
Thanks for the response .

can you please suggest some link or example to write own DeserializationSchema ?

Regards
Prateek

On Tue, Apr 26, 2016 at 11:06 AM, rmetzger0 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Prateek,

sorry for the late response. Can you try implementing your own DeserializationSchema, where you deserialize the String key manually (just call the "new String(byte[]) constructor).

The TypeInformationKeyValueSerializationSchema[String, byte] is generating deserializers with Flink's internal serializer stack (these assume that the data has been serialized by Flink as well). I think Flink's StringSerializer does some fancy optimizations and is not compatible with the standard String() format.



On Tue, Apr 26, 2016 at 6:34 PM, prateek arora <[hidden email]> wrote:
Hi Robert ,

Hi 

I have java program to send data into kafka topic. below is code for this : 

private Producer<String, byte[]> producer = null 

Serializer<String> keySerializer = new StringSerializer(); 
Serializer<byte[]> valueSerializer = new ByteArraySerializer(); 
producer = new KafkaProducer<String, byte[]>(props, keySerializer, valueSerializer); 

ProducerRecord<String, byte[]> imageRecord; 
imageRecord = new ProducerRecord<String, byte[]>(streamInfo.topic, 
                        Integer.toString(messageKey), imageData); 

producer.send(imageRecord); 


then trying to fetch data in Apache flink . 

Regards 
Prateek 

On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger <[hidden email]> wrote:
Hi Prateek,

were the messages written to the Kafka topic by Flink, using the TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink deserializers expect a different data format of the messages in the topic.

How are the messages written into the topic?


On Fri, Apr 22, 2016 at 10:21 PM, prateekarora <[hidden email]> wrote:

Hi

I am sending data using kafkaProducer API

                       imageRecord = new ProducerRecord<String,
byte[]>(topic,messageKey, imageData);
                        producer.send(imageRecord);


And in flink program  try to fect data using FlinkKafkaConsumer08 . below
are the sample code .

    def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "<IPADDRESS>:9092")
          properties.setProperty("zookeeper.connect", "<IPADDRESS>:2181")
          properties.setProperty("group.id", "test")

          val readSchema = new
TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]

          val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))

          stream.print
          env.execute("Flink Kafka Example")
  }


but getting  below error :

16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
FAILED
16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
CANCELING

java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
    at org.apache.flink.types.StringValue.readString(StringValue.java:771)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)


Regards
Prateek




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.






To unsubscribe from Getting java.lang.Exception when try to fetch data from Kafka, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Getting java.lang.Exception when try to fetch data from Kafka

rmetzger0
I would refer to the SimpleStringSchema as an example.

On Wed, Apr 27, 2016 at 7:11 PM, prateekarora <[hidden email]> wrote:
Thanks for the response .

can you please suggest some link or example to write own DeserializationSchema ?

Regards
Prateek

On Tue, Apr 26, 2016 at 11:06 AM, rmetzger0 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Prateek,

sorry for the late response. Can you try implementing your own DeserializationSchema, where you deserialize the String key manually (just call the "new String(byte[]) constructor).

The TypeInformationKeyValueSerializationSchema[String, byte] is generating deserializers with Flink's internal serializer stack (these assume that the data has been serialized by Flink as well). I think Flink's StringSerializer does some fancy optimizations and is not compatible with the standard String() format.



On Tue, Apr 26, 2016 at 6:34 PM, prateek arora <[hidden email]> wrote:
Hi Robert ,

Hi 

I have java program to send data into kafka topic. below is code for this : 

private Producer<String, byte[]> producer = null 

Serializer<String> keySerializer = new StringSerializer(); 
Serializer<byte[]> valueSerializer = new ByteArraySerializer(); 
producer = new KafkaProducer<String, byte[]>(props, keySerializer, valueSerializer); 

ProducerRecord<String, byte[]> imageRecord; 
imageRecord = new ProducerRecord<String, byte[]>(streamInfo.topic, 
                        Integer.toString(messageKey), imageData); 

producer.send(imageRecord); 


then trying to fetch data in Apache flink . 

Regards 
Prateek 

On Mon, Apr 25, 2016 at 2:42 AM, Robert Metzger <[hidden email]> wrote:
Hi Prateek,

were the messages written to the Kafka topic by Flink, using the TypeInformationKeyValueSerializationSchema ? If not, maybe the Flink deserializers expect a different data format of the messages in the topic.

How are the messages written into the topic?


On Fri, Apr 22, 2016 at 10:21 PM, prateekarora <[hidden email]> wrote:

Hi

I am sending data using kafkaProducer API

                       imageRecord = new ProducerRecord<String,
byte[]>(topic,messageKey, imageData);
                        producer.send(imageRecord);


And in flink program  try to fect data using FlinkKafkaConsumer08 . below
are the sample code .

    def main(args: Array[String]) {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "<IPADDRESS>:9092")
          properties.setProperty("zookeeper.connect", "<IPADDRESS>:2181")
          properties.setProperty("group.id", "test")

          val readSchema = new
TypeInformationKeyValueSerializationSchema[String,Array[Byte]](classOf[String],classOf[Array[Byte]],
env.getConfig).asInstanceOf[KeyedDeserializationSchema[(String,Array[Byte])]]

          val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",readSchema, properties))

          stream.print
          env.execute("Flink Kafka Example")
  }


but getting  below error :

16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (1/4) (d7a151560f6eabdc587a23dc0975cb84) switched from RUNNING to
FAILED
16/04/22 13:43:39 INFO ExecutionGraph: Source: Custom Source -> Sink:
Unnamed (2/4) (d43754a27e402ed5b02a73d1c9aa3125) switched from RUNNING to
CANCELING

java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:298)
    at org.apache.flink.types.StringValue.readString(StringValue.java:771)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:105)
    at
org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:39)
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657)


Regards
Prateek




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-java-lang-Exception-when-try-to-fetch-data-from-Kafka-tp6365.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.






To unsubscribe from Getting java.lang.Exception when try to fetch data from Kafka, click here.
NAML



View this message in context: Re: Getting java.lang.Exception when try to fetch data from Kafka