This post was updated on .
I trying to connect to schema registry and deserialize my object.
I am building my project with maven. On build I get the error .... class file for kafka.utils.VerifiableProperties not found... import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDecoder; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; public class ConfluentAvroDeserializationSchema implements DeserializationSchema<CelloAvro> { private final String schemaRegistryUrl; private final int identityMapCapacity; private KafkaAvroDecoder kafkaAvroDecoder; public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) { this(schemaRegistyUrl, 1000); } public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int identityMapCapacity) { this.schemaRegistryUrl = schemaRegistryUrl; this.identityMapCapacity = identityMapCapacity; } @Override public CelloAvro deserialize(byte[] bytes) throws IOException { if (kafkaAvroDecoder == null) { SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity); this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry); } return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes); } @Override public boolean isEndOfStream(CelloAvro celloAvro) { return false; } @Override public TypeInformation<CelloAvro> getProducedType() { return TypeExtractor.getForClass(CelloAvro.class); } } My dependencies are: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>4.0.0</version> </dependency> Could someone please help I see there is an open issue for an end to end test with Confluent's Schema Registry https://issues.apache.org/jira/browse/FLINK-8970 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
It looks like to me that kafka.utils.VerifiableProperties comes from org.apache.kafka:kafka package - please check and solve (if possible) dependency conflicts in your pom.xml regarding this package. Probably there is some version collision. Piotrek > On 21 Mar 2018, at 16:40, dim5b <[hidden email]> wrote: > > I trying to connect to schema registry and deserialize the project. > > I am building my project and on mvn build i get the error > > class file for kafka.utils.VerifiableProperties not found... > > > import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; > import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; > import io.confluent.kafka.serializers.KafkaAvroDecoder; > import org.apache.flink.api.common.serialization.DeserializationSchema; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.typeutils.TypeExtractor; > > > public class ConfluentAvroDeserializationSchema implements > DeserializationSchema<CelloAvro> { > > private final String schemaRegistryUrl; > private final int identityMapCapacity; > private KafkaAvroDecoder kafkaAvroDecoder; > > public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) { > this(schemaRegistyUrl, 1000); > } > > public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int > identityMapCapacity) { > this.schemaRegistryUrl = schemaRegistryUrl; > this.identityMapCapacity = identityMapCapacity; > } > > @Override > public CelloAvro deserialize(byte[] bytes) throws IOException { > if (kafkaAvroDecoder == null) { > SchemaRegistryClient schemaRegistry = new > CachedSchemaRegistryClient(this.schemaRegistryUrl, > this.identityMapCapacity); > this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry); > } > return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes); > } > > @Override > public boolean isEndOfStream(CelloAvro celloAvro) { > return false; > } > > @Override > public TypeInformation<CelloAvro> getProducedType() { > return TypeExtractor.getForClass(CelloAvro.class); > } > } > > My dependencies are: > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-avro</artifactId> > <version>${flink.version}</version> > </dependency> > > <dependency> > <groupId>io.confluent</groupId> > <artifactId>kafka-avro-serializer</artifactId> > <version>4.0.0</version> > </dependency> > > > Could someone please help I see there is an open issue for an end to end > test with Confluent's Schema Registry > > https://issues.apache.org/jira/browse/FLINK-8970 > > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This post was updated on .
I added kafka tomy dependencies although i am not sure why this would be
required... seems to work <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${kafka.scala.version}</artifactId> <version>${kafka.version}</version> </dependency> This is my full dependency list... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${kafka.scala.version}</artifactId> <version>${kafka.version}</version> </dependency> </dependencies> This does solve the issue but now i am getting the folowing error... java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to eu.avro.CelloAvro at eu.neurocom.schema.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:37) at eu.neurocom.schema.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:16) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |