Can't get keyed messages from Kafka

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Can't get keyed messages from Kafka

AndreaKinn
Hi,
I already spent two days trying to get simple messages from Kafka without success.

I have a Kafka producer written in javascript:

KeyedMessage = kafka.KeyedMessage;
keyed_message = new KeyedMessage(key, string_to_sent);
payload = [{topics: topic, messages: keyed_message }];

And I want to retrieve key and message in Flink. Firstly I have used SimpleStringSchema (just to verify if string was correctly deserialized) and works correctly ignoring the key.
Now I absolutely need to get also the key to group messages on key basis but I don't find any keyed deserializer which help me. If I understand TypeSchema is used just for flink_to_flink communication and JSONSchema obviously doesn't fit my needs.
Reply | Threaded
Open this post in threaded view
|

Re: Can't get keyed messages from Kafka

Chesnay Schepler
Have you tried implementing a KeyedDeserializationSchema?

This receives both the message and key as byte arrays, which you could then
deserialize as strings and return them in a Tuple2<String, String>.

On 13.06.2017 12:36, AndreaKinn wrote:

> Hi,
> I already spent two days trying to get simple messages from Kafka without
> success.
>
> I have a Kafka producer written in javascript:
>
> KeyedMessage = kafka.KeyedMessage;
> keyed_message = new KeyedMessage(key, string_to_sent);
> payload = [{topics: topic, messages: keyed_message }];
>
> And I want to retrieve key and message in Flink. Firstly I have used
> SimpleStringSchema (just to verify if string was correctly deserialized) and
> works correctly ignoring the key.
> Now I absolutely need to get also the key to group messages on key basis but
> I don't find any keyed deserializer which help me. If I understand
> TypeSchema is used just for flink_to_flink communication and JSONSchema
> obviously doesn't fit my needs.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Can't get keyed messages from Kafka

AndreaKinn
But KeyedDeserializationSchema has just 2 implementations:

TypeInformationKeyValueSerializationSchema
JSONKeyValueDeserializationSchema


The first give me this error:

06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)

while the JSONObject obviously doesn't fit my needs.

I thinking about to implement a custom deserialiser but honestly I'm a newbie and I don't know how to start.
Reply | Threaded
Open this post in threaded view
|

Re: Can't get keyed messages from Kafka

Chesnay Schepler
You have to create your own implementation that deserializes the byte
arrays into whatever type you want to use.

On 13.06.2017 13:19, AndreaKinn wrote:

> But KeyedDeserializationSchema has just 2 implementations:
>
> TypeInformationKeyValueSerializationSchema
> JSONKeyValueDeserializationSchema
>
>
> The first give me this error:
>
> 06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED
> java.io.EOFException at
> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
>
> while the JSONObject obviously doesn't fit my needs.
>
> I thinking about to implement a custom deserialiser but honestly I'm a
> newbie and I don't know how to start.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13689.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Can't get keyed messages from Kafka

AndreaKinn
Can I ask you to help me? I trying to implement a CustomDeserializer
My kafka messages are composed by KeyedMessages where key and messages are strings.
I created a new class named CustomObject to manage the message string because it's more complex then a simple string.


public class CustomDeserializer implements KeyedDeserializationSchema<Tuple2<String,CustomObject>>{

        @Override
        public boolean isEndOfStream(Tuple2<String, CustomJSONObject> nextElement) {
                return false;
        }

        @Override
        public TypeInformation<Tuple2<String, CustomJSONObject>> getProducedType() {
                return null;
        }

        @Override
        public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
                        throws IOException {
               
                String key = new String(messageKey);
                String msg = new String(message);
                CustomObject customObj = new CustomObject(msg);
               
                Tuple2<String,CustomObject> tuple = new Tuple2<String,CustomObject>(key, customObj);
                return tuple;
        }
}

Questions:

- I don't understand what is getProducedType method and its usefulness.
- Which methods have I to implement in my CustomObject class?

My main:

DataStream<Tuple2<String,CustomJSONObject>> stream = env.addSource(new FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(), properties)).rebalance();

stream.print();

If I execute it I get a nullPointerException so I imagine miss something in CustomObject class: I have implemented just a toString() method.
Reply | Threaded
Open this post in threaded view
|

Re: Can't get keyed messages from Kafka

Chesnay Schepler
in getProducedType(), replace the implementation with:

return new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
TypeExtractor.getForClass(CustomObject.class));

On 13.06.2017 17:18, AndreaKinn wrote:

> Can I ask you to help me? I trying to implement a CustomDeserializer
> My kafka messages are composed by KeyedMessages where key and messages are
> strings.
> I created a new class named CustomObject to manage the message string
> because it's more complex then a simple string.
>
>
> public class CustomDeserializer implements
> KeyedDeserializationSchema<Tuple2&lt;String,CustomObject>>{
>
> @Override
> public boolean isEndOfStream(Tuple2<String, CustomJSONObject> nextElement)
> {
> return false;
> }
>
> @Override
> public TypeInformation<Tuple2&lt;String, CustomJSONObject>>
> getProducedType() {
> return null;
> }
>
> @Override
> public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey,
> byte[] message, String topic, int partition, long offset)
> throws IOException {
>
> String key = new String(messageKey);
> String msg = new String(message);
> CustomObject customObj = new CustomObject(msg);
>
> Tuple2<String,CustomObject> tuple = new Tuple2<String,CustomObject>(key,
> customObj);
> return tuple;
> }
> }
>
> Questions:
>
> - I don't understand what is getProducedType method and its usefulness.
> - Which methods have I to implement in my CustomObject class?
>
> My main:
>
> DataStream<Tuple2&lt;String,CustomJSONObject>> stream = env.addSource(new
> FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(),
> properties)).rebalance();
>
> stream.print();
>
> If I execute it I get a nullPointerException so I imagine miss something in
> CustomObject class: I have implemented just a toString() method.
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13702.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Can't get keyed messages from Kafka

AndreaKinn
Thank's that works!