Hi everyone.
How can I get entry in GenericRecord format from kafka topic using SchemaRegistry? I read this: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html But can't to build it in my code... Is there some tutorials or examples to deserialise data using schema.rgistry.url? Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I tried this:
1. Schema (found in stackoverflow) class GenericRecordSchema implements KafkaDeserializationSchema<GenericRecord> { private String registryUrl; private transient KafkaAvroDeserializer deserializer; public GenericRecordSchema(String registryUrl) { this.registryUrl = registryUrl; } @Override public boolean isEndOfStream(GenericRecord nextElement) { return false; } @Override public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { checkInitialized(); return (GenericRecord) deserializer.deserialize(consumerRecord.topic(), consumerRecord.value()); } @Override public TypeInformation<GenericRecord> getProducedType() { return TypeExtractor.getForClass(GenericRecord.class); } private void checkInitialized() { if (deserializer == null) { Map<String, Object> props = new HashMap<>(); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); SchemaRegistryClient client = new CachedSchemaRegistryClient( registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); deserializer = new KafkaAvroDeserializer(client, props); } } } 2. Consumer private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) { return new FlinkKafkaConsumer<>( topic, new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"), getConsumerProperties()); } But when I start the app, the following error is happen: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 26 more Not solving with: env.getConfig().disableForceKryo(); env.getConfig().enableForceAvro(); Any idea? Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Maminspapin, I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for? Best, Matthias On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <[hidden email]> wrote: I tried this: |
Ok, it looks like you've found that solution already based on your question in [1]. On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl <[hidden email]> wrote:
|
In reply to this post by Matthias
The issue at hand is that the record contains an unmodifiable collection which the kryo serialiser attempts to modify by first initialising the object and then adding items to the collection (iirc). Caused by: java.lang.UnsupportedOperationException Without knowing the specifics of what it is exactly you are trying to deserialise I can only attempt to give a generic answer which is to try something like: StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); An even better approach is to set-up a local sandbox environment in docker with Kafka and a sink of your choice and simply running the application form the main method in debug mode and setting a breakpoint right before it throws the exception. Kind regards, Arian Rohani Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <[hidden email]>:
|
Arian gave good pointers, but I'd go even further: you should have ITCases where you pretty much just execute a mini job with docker-based Kafka and run it automatically. I strongly recommend to check out testcontainers [1], it makes writing such a test a really smooth experience. On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani <[hidden email]> wrote:
|
Thank you Arvid, I was going to suggest something like this also. We use TestContainers and the docker images provided by ververica to do exactly this in our team. I am currently working on a small project on github to start sharing for use cases like this. The project will contain some example sources and example sinks together with a generic Flink application. I will follow up sometime during the weekend with a poc. It's super straightforward to set-up and use. To elaborate a bit more on Arvids suggestion:
In fact one of the major benefits is that you simply configure the source and sink and run the application outside of docker (as a LocalStreamEnvironment). This enables you to set breakpoints where the application is throwing the exception which is specially valuable in circumstances like this where the stacktrace is not super descriptive. Best, Arian Rohani Den tors 1 apr. 2021 kl 15:00 skrev Arvid Heise <[hidden email]>:
|
In reply to this post by Maminspapin
Thank you all very much!
The problem is solved using ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://xxx.xx.xxx.xx:8081") method. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html> But I want to explore your notes. So many new things for me )) Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |