Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

Oliver Buckley-Salmon

Hi,

I have a Flink 1.4.0 cluster running on OpenShift with a job that connects to a Kafka 0.11.0.1 cluster in the same Openshift project. The job reads from one topic and writes to two others.

The job deploys OK but when it starts up it immediately crashes with the following exception

org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

         at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:93)

         at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:258)

         at org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)

         at org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:81)

         at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:74)

         at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:396)

         at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:370)

         at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:332)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:409)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)

         at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314)

         at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1386)

         at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)

         at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)

         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)

         at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)

         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

         at java.lang.Thread.run(Thread.java:748)

 

The version of the Flink Kafka Connector I’m using is

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

    <version>1.4.0</version>

</dependency>

 

I can write and consume from the Kafka cluster and can see the brokers in Zookeeper, can anyone tell me what the exception means and what I can do to resolve it?

Thanks very much in advance for your help.

 

Kind regards,
Oliver Buckley-Salmon




 



---
This e-mail may contain confidential and/or privileged information. If you are not the intended recipient (or have received this e-mail in error) please notify the sender immediately and delete this e-mail. Any unauthorized copying, disclosure or distribution of the material in this e-mail is strictly forbidden.

Please refer to https://www.db.com/disclosures for additional EU corporate and regulatory disclosures and to http://www.db.com/unitedkingdom/content/privacy.htm for information about privacy.
Reply | Threaded
Open this post in threaded view
|

RE: Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

Oliver Buckley-Salmon

Sorry, there was a code issue, where I was creating a kafka 10 consumer.

Problem solved.

 

From: Oliver Buckley-Salmon
Sent: 07 September 2018 15:04
To: [hidden email]
Subject: Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

 

Hi,

I have a Flink 1.4.0 cluster running on OpenShift with a job that connects to a Kafka 0.11.0.1 cluster in the same Openshift project. The job reads from one topic and writes to two others.

The job deploys OK but when it starts up it immediately crashes with the following exception

org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException

         at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:93)

         at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:258)

         at org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)

         at org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:81)

         at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:74)

         at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:396)

         at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:370)

         at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:332)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:409)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)

         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)

         at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314)

         at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1386)

         at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)

         at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)

         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)

         at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)

         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

         at java.lang.Thread.run(Thread.java:748)

 

The version of the Flink Kafka Connector I’m using is

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

    <version>1.4.0</version>

</dependency>

 

I can write and consume from the Kafka cluster and can see the brokers in Zookeeper, can anyone tell me what the exception means and what I can do to resolve it?

Thanks very much in advance for your help.

 

Kind regards,
Oliver Buckley-Salmon



 



---
This e-mail may contain confidential and/or privileged information. If you are not the intended recipient (or have received this e-mail in error) please notify the sender immediately and delete this e-mail. Any unauthorized copying, disclosure or distribution of the material in this e-mail is strictly forbidden.

Please refer to https://www.db.com/disclosures for additional EU corporate and regulatory disclosures and to http://www.db.com/unitedkingdom/content/privacy.htm for information about privacy.