Hi,
I already spent two days trying to get simple messages from Kafka without success. I have a Kafka producer written in javascript: KeyedMessage = kafka.KeyedMessage; keyed_message = new KeyedMessage(key, string_to_sent); payload = [{topics: topic, messages: keyed_message }]; And I want to retrieve key and message in Flink. Firstly I have used SimpleStringSchema (just to verify if string was correctly deserialized) and works correctly ignoring the key. Now I absolutely need to get also the key to group messages on key basis but I don't find any keyed deserializer which help me. If I understand TypeSchema is used just for flink_to_flink communication and JSONSchema obviously doesn't fit my needs. |
Have you tried implementing a KeyedDeserializationSchema?
This receives both the message and key as byte arrays, which you could then deserialize as strings and return them in a Tuple2<String, String>. On 13.06.2017 12:36, AndreaKinn wrote: > Hi, > I already spent two days trying to get simple messages from Kafka without > success. > > I have a Kafka producer written in javascript: > > KeyedMessage = kafka.KeyedMessage; > keyed_message = new KeyedMessage(key, string_to_sent); > payload = [{topics: topic, messages: keyed_message }]; > > And I want to retrieve key and message in Flink. Firstly I have used > SimpleStringSchema (just to verify if string was correctly deserialized) and > works correctly ignoring the key. > Now I absolutely need to get also the key to group messages on key basis but > I don't find any keyed deserializer which help me. If I understand > TypeSchema is used just for flink_to_flink communication and JSONSchema > obviously doesn't fit my needs. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
But KeyedDeserializationSchema has just 2 implementations:
TypeInformationKeyValueSerializationSchema JSONKeyValueDeserializationSchema The first give me this error: 06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306) while the JSONObject obviously doesn't fit my needs. I thinking about to implement a custom deserialiser but honestly I'm a newbie and I don't know how to start. |
You have to create your own implementation that deserializes the byte
arrays into whatever type you want to use. On 13.06.2017 13:19, AndreaKinn wrote: > But KeyedDeserializationSchema has just 2 implementations: > > TypeInformationKeyValueSerializationSchema > JSONKeyValueDeserializationSchema > > > The first give me this error: > > 06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED > java.io.EOFException at > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306) > > while the JSONObject obviously doesn't fit my needs. > > I thinking about to implement a custom deserialiser but honestly I'm a > newbie and I don't know how to start. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13689.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Can I ask you to help me? I trying to implement a CustomDeserializer
My kafka messages are composed by KeyedMessages where key and messages are strings. I created a new class named CustomObject to manage the message string because it's more complex then a simple string. public class CustomDeserializer implements KeyedDeserializationSchema<Tuple2<String,CustomObject>>{ @Override public boolean isEndOfStream(Tuple2<String, CustomJSONObject> nextElement) { return false; } @Override public TypeInformation<Tuple2<String, CustomJSONObject>> getProducedType() { return null; } @Override public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { String key = new String(messageKey); String msg = new String(message); CustomObject customObj = new CustomObject(msg); Tuple2<String,CustomObject> tuple = new Tuple2<String,CustomObject>(key, customObj); return tuple; } } Questions: - I don't understand what is getProducedType method and its usefulness. - Which methods have I to implement in my CustomObject class? My main: DataStream<Tuple2<String,CustomJSONObject>> stream = env.addSource(new FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(), properties)).rebalance(); stream.print(); If I execute it I get a nullPointerException so I imagine miss something in CustomObject class: I have implemented just a toString() method. |
in getProducedType(), replace the implementation with:
return new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, TypeExtractor.getForClass(CustomObject.class)); On 13.06.2017 17:18, AndreaKinn wrote: > Can I ask you to help me? I trying to implement a CustomDeserializer > My kafka messages are composed by KeyedMessages where key and messages are > strings. > I created a new class named CustomObject to manage the message string > because it's more complex then a simple string. > > > public class CustomDeserializer implements > KeyedDeserializationSchema<Tuple2<String,CustomObject>>{ > > @Override > public boolean isEndOfStream(Tuple2<String, CustomJSONObject> nextElement) > { > return false; > } > > @Override > public TypeInformation<Tuple2<String, CustomJSONObject>> > getProducedType() { > return null; > } > > @Override > public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey, > byte[] message, String topic, int partition, long offset) > throws IOException { > > String key = new String(messageKey); > String msg = new String(message); > CustomObject customObj = new CustomObject(msg); > > Tuple2<String,CustomObject> tuple = new Tuple2<String,CustomObject>(key, > customObj); > return tuple; > } > } > > Questions: > > - I don't understand what is getProducedType method and its usefulness. > - Which methods have I to implement in my CustomObject class? > > My main: > > DataStream<Tuple2<String,CustomJSONObject>> stream = env.addSource(new > FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(), > properties)).rebalance(); > > stream.print(); > > If I execute it I get a nullPointerException so I imagine miss something in > CustomObject class: I have implemented just a toString() method. > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13702.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Thank's that works!
|
Free forum by Nabble | Edit this page |