DataStream<GenericRecord> from kafka topic

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream<GenericRecord> from kafka topic

Maminspapin
Hi everyone.

How can I get entry in GenericRecord format from kafka topic using
SchemaRegistry?
I read this:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
But can't to build it in my code...

Is there some tutorials or examples to deserialise data using
schema.rgistry.url?

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Maminspapin
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();
           
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
           
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Matthias
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for?

Best,
Matthias


On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <[hidden email]> wrote:
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Matthias
Ok, it looks like you've found that solution already based on your question in [1].


On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl <[hidden email]> wrote:
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for?

Best,
Matthias


On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <[hidden email]> wrote:
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Arian Rohani
In reply to this post by Matthias
The issue at hand is that the record contains an unmodifiable collection which the kryo serialiser attempts to modify by first initialising the object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)

Without knowing the specifics of what it is exactly you are trying to deserialise I can only attempt to give a generic answer which is to try something like:
 
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

An even better approach is to set-up a local sandbox environment in docker with Kafka and a sink of your choice and simply running the application form the main method in debug mode and setting a breakpoint right before it throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <[hidden email]>:
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for?

Best,
Matthias


On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <[hidden email]> wrote:
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Arvid Heise-4
Arian gave good pointers, but I'd go even further: you should have ITCases where you pretty much just execute a mini job with docker-based Kafka and run it automatically.
I strongly recommend to check out testcontainers [1], it makes writing such a test a really smooth experience.



On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani <[hidden email]> wrote:
The issue at hand is that the record contains an unmodifiable collection which the kryo serialiser attempts to modify by first initialising the object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)

Without knowing the specifics of what it is exactly you are trying to deserialise I can only attempt to give a generic answer which is to try something like:
 
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

An even better approach is to set-up a local sandbox environment in docker with Kafka and a sink of your choice and simply running the application form the main method in debug mode and setting a breakpoint right before it throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <[hidden email]>:
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for?

Best,
Matthias


On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <[hidden email]> wrote:
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Arian Rohani
Thank you Arvid, I was going to suggest something like this also.
We use TestContainers and the docker images provided by ververica to do exactly this in our team.

I am currently working on a small project on github to start sharing for use cases like this.
The project will contain some example sources and example sinks together with a generic Flink application.
I will follow up sometime during the weekend with a poc. It's super straightforward to set-up and use.

To elaborate a bit more on Arvids suggestion: 
  • Use TestContainers as a base to configure your integration test.
  • LocalStack is a fully functional docker container that you can use to mock various AWS services. Since it's unclear what sink you're using i just want to throw this out there.
  • Set up two containers abstracting the job manager and task manager according to this documentation. If you decide to go with the application cluster route then I suggest setting up the task manager and job manager as GenericContainers. The rationale is that if you do everything in docker-compose and use a DockerComposeContainer the application will start before you have a chance to mock the data in your source as the DockerComposeContainer is started immediately iirc (which may be problematic depending on the way you application is configured to read from Kafka). 
In fact one of the major benefits is that you simply configure the source and sink and run the application outside of docker (as a LocalStreamEnvironment).
This enables you to set breakpoints where the application is throwing the exception which is specially valuable in circumstances like this where the stacktrace is not super descriptive.

Best,
Arian Rohani


Den tors 1 apr. 2021 kl 15:00 skrev Arvid Heise <[hidden email]>:
Arian gave good pointers, but I'd go even further: you should have ITCases where you pretty much just execute a mini job with docker-based Kafka and run it automatically.
I strongly recommend to check out testcontainers [1], it makes writing such a test a really smooth experience.



On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani <[hidden email]> wrote:
The issue at hand is that the record contains an unmodifiable collection which the kryo serialiser attempts to modify by first initialising the object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)

Without knowing the specifics of what it is exactly you are trying to deserialise I can only attempt to give a generic answer which is to try something like:
 
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

An even better approach is to set-up a local sandbox environment in docker with Kafka and a sink of your choice and simply running the application form the main method in debug mode and setting a breakpoint right before it throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <[hidden email]>:
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for?

Best,
Matthias


On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <[hidden email]> wrote:
I tried this:

1. Schema (found in stackoverflow)

class GenericRecordSchema implements
KafkaDeserializationSchema<GenericRecord> {

    private String registryUrl;
    private transient KafkaAvroDeserializer deserializer;

    public GenericRecordSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
consumerRecord) throws Exception {
        checkInitialized();
        return (GenericRecord)
deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (deserializer == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            deserializer = new KafkaAvroDeserializer(client, props);
        }
    }
}

2. Consumer

private static FlinkKafkaConsumer<GenericRecord> getConsumer(String topic) {

    return new FlinkKafkaConsumer<>(
            topic,
            new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"),
            getConsumerProperties());
}

But when I start the app, the following error is happen:

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
        at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
        at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.UnsupportedOperationException
        at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 26 more


Not solving with:
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();


Any idea?

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: DataStream<GenericRecord> from kafka topic

Maminspapin
In reply to this post by Maminspapin