different Kafka serialization for keyed and non keyed messages

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

different Kafka serialization for keyed and non keyed messages

rss rss
Hello,

  why Flink implements different serialization schemes for keyed and non keyed messages for Kafka?

  I'm using two ways of loading of messages to Kafka. First way is on-fly loading without Flink by Kafka's means only. In this case I'm using something like:
props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
String key = event.getUserId();
String value = DummyEvent.eventToString(event);
producer.send(new ProducerRecord<>(topic, key, value));

 And from Flink side I can read it without a key by code like:
DataStream<String> dataStream = env
.addSource(new FlinkKafkaConsumer08<String>(
"topic",
new SimpleStringSchema(), kafkaProps));
As a result I have pure message without a key. Actually I need a key only for partitioning by Kafka and I have an appropriate class https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java . That is standard java-hash for String class.


  Also I have other case for messages loading from hadoop to Kafka. I'm using Flink for this purpose. All is ok when I'm using
dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),
new SimpleStringSchema(),
kafkaProps));
But I need partitioning in Kafka and I changed it to
TypeInformation<Tuple2<String, String>> stringStringInfo =
TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String, String>");

KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(String.class, String.class, env.getConfig());

dataStream
.map(json -> {
Event event = gson.fromJson(json, Event.class);
return new Tuple2<String, String>(event.getUserId(), json);
}).returns(stringStringInfo)
.setParallelism(partitions)
.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),
schema,
kafkaProps));
As a result I see that a message which are serialized by TypeInformationKeyValueSerializationSchema may be deserialized by Flink's SimpleStringSchema() or by Kafka's StringSerializer only with additional first symbol. I guess this is a size of String which is added by org.apache.flink.types.StringValue#writeString. That is the value of a message is not more readable by Spark, Storm, Kafka consumer with standard deserialization....

   The question, is it correct behavior of Flink? And should I implement own serializer and partitioner for Flink's Kafka sink if I want to use just simple String serialization which may be read by all other tools without Flink?

   And second question, why Flink requires to implement a custom partitioner for serialized byte[] stream instead of using of primary objects as in Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner class.

  PS: I can give a link to sources if you have an access to https://github.com/stratosphere/ private repos.

Thanks,
best regards
Reply | Threaded
Open this post in threaded view
|

Re: different Kafka serialization for keyed and non keyed messages

rmetzger0
Hi Rss,

> why Flink implements different serialization schemes for keyed and non keyed messages for Kafka?

The non-keyed serialization schema is a basic schema, which works for most use cases.
For advanced users which need access to the key, offsets, the partition or topic, there's the keyed ser schema.
But the keyed schema is richer and can completely subsume the simple, non-keyed one.

As a result I see that a message which are serialized by TypeInformationKeyValueSerializationSchema may be deserialized by Flink's SimpleStringSchema() or by Kafka's StringSerializer only with additional first symbol.

The TypeInformationKeyValueSerializationSchema is only meant to be used for Flink <--> Flink communication through Kafka, because it depends on Flink's internal serializers (it might even depend on the exact ExecutionConfig settings).


> The question, is it correct behavior of Flink? And should I implement own serializer and partitioner for Flink's Kafka sink if I want to use just simple String serialization which may be read by all other tools without Flink?

The behavior is correct. If the SimpleStringSchema is not sufficient for the other systems, you need to impl. your own serializer.

> And second question, why Flink requires to implement a custom partitioner for serialized byte[] stream instead of using of primary objects as in Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner class.

If you are not specifying any Flink partitioner, we'll use the configured Kafka partitioner. 
The advantage of using Flink's own partitioner is that you can access information like the subtaskId and the number of subtasks.

Regards,
Robert




On Sun, Aug 28, 2016 at 6:16 PM, rss rss <[hidden email]> wrote:
Hello,

  why Flink implements different serialization schemes for keyed and non keyed messages for Kafka?

  I'm using two ways of loading of messages to Kafka. First way is on-fly loading without Flink by Kafka's means only. In this case I'm using something like:
props.put("partitioner.class", KafkaPartitioner.class.getCanonicalName());
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
String key = event.getUserId();
String value = DummyEvent.eventToString(event);
producer.send(new ProducerRecord<>(topic, key, value));

 And from Flink side I can read it without a key by code like:
DataStream<String> dataStream = env
.addSource(new FlinkKafkaConsumer08<String>(
"topic",
new SimpleStringSchema(), kafkaProps));
As a result I have pure message without a key. Actually I need a key only for partitioning by Kafka and I have an appropriate class https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java . That is standard java-hash for String class.


  Also I have other case for messages loading from hadoop to Kafka. I'm using Flink for this purpose. All is ok when I'm using
dataStream.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),
new SimpleStringSchema(),
kafkaProps));
But I need partitioning in Kafka and I changed it to
TypeInformation<Tuple2<String, String>> stringStringInfo =
TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple2<String, String>");

KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(String.class, String.class, env.getConfig());

dataStream
.map(json -> {
Event event = gson.fromJson(json, Event.class);
return new Tuple2<String, String>(event.getUserId(), json);
}).returns(stringStringInfo)
.setParallelism(partitions)
.addSink(new FlinkKafkaProducer08<>(config.getProperty("topic", Config.INPUT_TOPIC_NAME),
schema,
kafkaProps));
As a result I see that a message which are serialized by TypeInformationKeyValueSerializationSchema may be deserialized by Flink's SimpleStringSchema() or by Kafka's StringSerializer only with additional first symbol. I guess this is a size of String which is added by org.apache.flink.types.StringValue#writeString. That is the value of a message is not more readable by Spark, Storm, Kafka consumer with standard deserialization....

   The question, is it correct behavior of Flink? And should I implement own serializer and partitioner for Flink's Kafka sink if I want to use just simple String serialization which may be read by all other tools without Flink?

   And second question, why Flink requires to implement a custom partitioner for serialized byte[] stream instead of using of primary objects as in Kafka's partitioner? Or instead of just allowing to use Kafka's partitioner class.

  PS: I can give a link to sources if you have an access to https://github.com/stratosphere/ private repos.

Thanks,
best regards