Hi all,
I am trying to work with flink to get avro data from kafka for which the schemas are stored in kafka schema registry. Since, the producer for kafka is a totally different service(an MQTT consumer sinked to kafka), I can’t have the schema with me at the consumer end. I read around and diverged to the following implementation of KeyedDeserializationSchema but I cannot understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException` class AvroDeserializationSchema(schemaRegistryUrl: String) extends KeyedDeserializationSchema[GenericRecord] {
// Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
@transient lazy val valueDeserializer = {
val deserializer = new KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> false).asJava,
false)
deserializer
}
override def isEndOfStream(nextElement: GenericRecord): Boolean = false
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): GenericRecord = {
// val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
value
}
override def getProducedType: TypeInformation[GenericRecord] =
TypeExtractor.getForClass(classOf[GenericRecord])
}
I have no clue how to go about solving this. I saw a lot of people trying to implement the same. If someone can guide me, it’d be really helpful. Thanks! Nitish
|
Hi Nitish, Kryo is the fallback serializer of Flink when everything else fails. In general, performance suffers quite a bit and it's not always applicable as in your case. Especially, in production code, it's best to avoid it completely. In your case, the issue is that your provided type information is completely meaningless. getProducedType is not providing any actual type information but just references to a generic skeleton. Flink uses the type information to reason about the value structures, which it cannot in your case. If you really need to resort to a completely generic serializer (which is usually not needed), then you have a few options: * Easiest, stick to byte[] and convert in a downstream UDF. If it's that generic you probably have only a simple transformation before outputting it into some generic Kafka sink. So your UDF deserializes, does some generic stuff, and immediately turns it back into byte[]. * Implement your own generic TypeInformation with serializer. WritableTypeInfo [1] is a generic example on how to do it. This will automatically convert byte[] back and forth to GenericRecord. That would be the recommended way when you have multiple transformations before source and sink. On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <[hidden email]> wrote:
|
Hi Nitish, Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is not a pojo this call will produce a GenericTypeInfo which uses Kryo serialization. For a reference example I would recommend having a look at AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for working with GenericRecords. One important note. GenericRecords are not the best candidates for a data objects in Flink. The reason is if you apply any transformation on a GenericRecord e.g. map/flatMap. The input type information cannot be forwarded as the transformation is a black box from Flink's perspective. Therefore you would need to provide the type information for every step of the pipeline: TypeInformation<?> info = ... sEnv.addSource(...) // produces info .map(...) .returns(info) // must be provided again, as the map transformation is a black box, the transformation might produce a completely different record Hope that helps a bit. Best, Dawid On 02/03/2020 09:04, Arvid Heise wrote:
signature.asc (849 bytes) Download Attachment |
Hi,
Thanks for the replies. I get that it is not wise to use GenericRecord and that is what is causing the Kryo fallback, but then if not this, how should I go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. Without the knowledge of schema, I can’t create a class. Can you suggest a way of getting around that? Thanks!
|
Could you please give more background on your use case? It's hard to give any advice with the little information you gave us. Usually, the consumer should know the schema or else it's hard to do meaningful processing. If it's something completely generic, then there is no way around it, but that should be the last straw. Here my recommendations from my first response would come into play. If they are not working for you for some reason, please let me know why and I could come up with a solution. On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <[hidden email]> wrote:
|
Hi,
So I am building a data pipeline that takes input from sensors via MQTT broker and passes it to kafka. Before it goes to kafka, I am filtering and serializing the filtered data into avro format and keeping the schema in the registry. Now I want to get that data in flink to process it using some algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the schemas for my data. One work around for me would be to get the schema corresponding the data that I’ll be getting from a topic separately from the registry and then work forward, but I was hoping there would a way to avoid this and integrate the schema registry with my consumer in some way like kafka-connect does. This is why I was trying this solution. Do you think I should maybe do the work around method as implementing a GenericRecord would be more of a overhead in the longer run? Thanks!
|
I didn't get the use case completely. Are you using several sensors with different schemas? Are processing them jointly? Let's assume some cases: 1) Only one format, it would be best to generate a case class with avrohugger. That is especially true if you processing actually requires specific fields to be present. 2) Several sensors, but processed independently. You could do the same as 1) for all sensors. If you don't need to access specific fields, you should fetch the latest schema in your main() and all the things that Flink provides. 3) You have constantly changing schemas and want to forward records always with the latest schema enriched with some fields. You need to stick to GenericRecord. I'd go with the byte[] approach of my first response if you only have one such application / processing step. 4) Else go with the custom TypeInfo/Serializer. We can help you to implement it. If you can do it yourself, I'd be awesome to put it as a response here for other users. On Mon, Mar 2, 2020 at 11:01 AM Nitish Pant <[hidden email]> wrote:
|
Hi Arvid,
It’s actually the second case. I just wanted to build a scalable generic case where I can pass a set of kafka topics and my consumer can use the same AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema part in main() separately. Thanks for the help!
|
Free forum by Nabble | Edit this page |