Hello, why Flink implements different serialization schemes for keyed and non keyed messages for Kafka?props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName()); producer = new KafkaProducer<>(props); And from Flink side I can read it without a key by code like: DataStream<String> dataStream = envAs a result I have pure message without a key. Actually I need a key only for partitioning by Kafka and I have an appropriate class https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java . That is standard java-hash for String class. Also I have other case for messages loading from hadoop to Kafka. I'm using Flink for this purpose. All is ok when I'm using dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),But I need partitioning in Kafka and I changed it to TypeInformation<Tuple2<String, String>> stringStringInfo = As a result I see that a message which are serialized by TypeInformationKeyValueSerializationSchema may be deserialized by Flink's SimpleStringSchema() or by Kafka's StringSerializer only with additional first symbol. I guess this is a size of String which is added by org.apache.flink.types.StringValue#writeString. That is the value of a message is not more readable by Spark, Storm, Kafka consumer with standard deserialization.... The question, is it correct behavior of Flink? And should I implement own serializer and partitioner for Flink's Kafka sink if I want to use just simple String serialization which may be read by all other tools without Flink? And second question, why Flink requires to implement a custom partitioner for serialized byte[] stream instead of using of primary objects as in Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner class. PS: I can give a link to sources if you have an access to https://github.com/stratosphere/ private repos. Thanks, best regards |
Hi Rss, > why Flink implements different serialization schemes for keyed and non keyed messages for Kafka? The non-keyed serialization schema is a basic schema, which works for most use cases. For advanced users which need access to the key, offsets, the partition or topic, there's the keyed ser schema. But the keyed schema is richer and can completely subsume the simple, non-keyed one. > As a result I see that a message which are serialized by TypeInformationKeyValueSeriali The TypeInformationKeyValueSeriali > The question, is it correct behavior of Flink? And should I implement own serializer and partitioner for Flink's Kafka sink if I want to use just simple String serialization which may be read by all other tools without Flink? The behavior is correct. If the SimpleStringSchema is not sufficient for the other systems, you need to impl. your own serializer. > And second question, why Flink requires to implement a custom partitioner for serialized byte[] stream instead of using of primary objects as in Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner class. If you are not specifying any Flink partitioner, we'll use the configured Kafka partitioner. The advantage of using Flink's own partitioner is that you can access information like the subtaskId and the number of subtasks. Regards, Robert On Sun, Aug 28, 2016 at 6:16 PM, rss rss <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |