How to fetch kafka Message have [KEY,VALUE] pair

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to fetch kafka Message have [KEY,VALUE] pair

prateekarora
Hi

I am new for Apache Flink and start  using Flink version 1.0.1

In my scenario,   kafka message have key value pair [String,Array[Byte]] . 

I tried to use FlinkKafkaConsumer08 to fetch data but i dont know how to write  DeserializationSchema for that.

val stream : DataStream[(String,Array[Byte])]  = env.addSource(new FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",<DeserializationSchema> , properties))

please help me to solve this problem .

Regards
Prateek
Reply | Threaded
Open this post in threaded view
|

Re: How to fetch kafka Message have [KEY,VALUE] pair

Till Rohrmann

Depending on how the key value pair is encoded, you could use the TypeInformationKeyValueSerializationSchema where you provide the BasicTypeInfo.STRING_TYPE_INFO and PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO as the key and value type information. But this only works if your data was serialized in a similar fashion.

Cheers,
Till


On Fri, Apr 22, 2016 at 12:02 AM, prateek arora <[hidden email]> wrote:
Hi

I am new for Apache Flink and start  using Flink version 1.0.1

In my scenario,   kafka message have key value pair [String,Array[Byte]] . 

I tried to use FlinkKafkaConsumer08 to fetch data but i dont know how to write  DeserializationSchema for that.

val stream : DataStream[(String,Array[Byte])]  = env.addSource(new FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",<DeserializationSchema> , properties))

please help me to solve this problem .

Regards
Prateek

Reply | Threaded
Open this post in threaded view
|

Re: How to fetch kafka Message have [KEY,VALUE] pair

rmetzger0
If you've serialized your data with a custom format, you can also implement a custom deserializer using the KeyedDeserializationSchema.

On Fri, Apr 22, 2016 at 2:35 PM, Till Rohrmann <[hidden email]> wrote:

Depending on how the key value pair is encoded, you could use the TypeInformationKeyValueSerializationSchema where you provide the BasicTypeInfo.STRING_TYPE_INFO and PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO as the key and value type information. But this only works if your data was serialized in a similar fashion.

Cheers,
Till


On Fri, Apr 22, 2016 at 12:02 AM, prateek arora <[hidden email]> wrote:
Hi

I am new for Apache Flink and start  using Flink version 1.0.1

In my scenario,   kafka message have key value pair [String,Array[Byte]] . 

I tried to use FlinkKafkaConsumer08 to fetch data but i dont know how to write  DeserializationSchema for that.

val stream : DataStream[(String,Array[Byte])]  = env.addSource(new FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",<DeserializationSchema> , properties))

please help me to solve this problem .

Regards
Prateek