Kafka SimpleStringConsumer NPE

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka SimpleStringConsumer NPE

dbciar
Hello Everyone,

I was wondering if anyone could help shed light on where I have introduced an error into my code to get the following error:

java.lang.NullPointerException
        at java.lang.String.<init>(String.java:556)
        at org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:34)
        at org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:27)
        at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
        at java.lang.Thread.run(Thread.java:745)

I get this error while running a job that connects to kafka from a local deployment.  Could it be to do with how I'm packaging the Jar before uploading it to the cluster?  

The job plan is created and deployed OK via the management website, but as soon as data is added to Kafka I get the above and the job stops.  Using Kafka's own console consumer script, I validated the kafka queue and the data looks exactly like the testing data I used when reading from local files.

Any help as always appreciated,
Cheers,
David
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SimpleStringConsumer NPE

Tzu-Li (Gordon) Tai
Hi David,

Is it possible that your Kafka installation is an older version than 0.9? Or you may have used a different Kafka client major version in your job jar's dependency?
This seems like an odd incompatible protocol with the Kafka broker to me, as the client in the Kafka consumer is reading null record bytes.

Regards,
Gordon


On September 4, 2016 at 7:17:04 AM, dbciar ([hidden email]) wrote:

Hello Everyone,

I was wondering if anyone could help shed light on where I have introduced
an error into my code to get the following error:

java.lang.NullPointerException
at java.lang.String.<init>(String.java:556)
at
org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:34)
at
org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:27)
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
at java.lang.Thread.run(Thread.java:745)

I get this error while running a job that connects to kafka from a local
deployment. Could it be to do with how I'm packaging the Jar before
uploading it to the cluster?

The job plan is created and deployed OK via the management website, but as
soon as data is added to Kafka I get the above and the job stops. Using
Kafka's own console consumer script, I validated the kafka queue and the
data looks exactly like the testing data I used when reading from local
files.

Any help as always appreciated,
Cheers,
David



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-SimpleStringConsumer-NPE-tp8888.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka SimpleStringConsumer NPE

Maximilian Michels
Your Kafka topic seems to contain null values. By default, Flink will
just forward null values to the DeserializationSchema which has to
take care of null values. The SimpleStringSchema doesn't do that and
fails with a NullPointerException. Thus, you need an additional check
in your DeserializationSchema to handle null values.

On Sun, Sep 4, 2016 at 2:46 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

> Hi David,
>
> Is it possible that your Kafka installation is an older version than 0.9? Or
> you may have used a different Kafka client major version in your job jar's
> dependency?
> This seems like an odd incompatible protocol with the Kafka broker to me, as
> the client in the Kafka consumer is reading null record bytes.
>
> Regards,
> Gordon
>
>
> On September 4, 2016 at 7:17:04 AM, dbciar ([hidden email]) wrote:
>
> Hello Everyone,
>
> I was wondering if anyone could help shed light on where I have introduced
> an error into my code to get the following error:
>
> java.lang.NullPointerException
> at java.lang.String.<init>(String.java:556)
> at
> org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:34)
> at
> org.apache.flink.streaming.util.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:27)
> at
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
> at java.lang.Thread.run(Thread.java:745)
>
> I get this error while running a job that connects to kafka from a local
> deployment. Could it be to do with how I'm packaging the Jar before
> uploading it to the cluster?
>
> The job plan is created and deployed OK via the management website, but as
> soon as data is added to Kafka I get the above and the job stops. Using
> Kafka's own console consumer script, I validated the kafka queue and the
> data looks exactly like the testing data I used when reading from local
> files.
>
> Any help as always appreciated,
> Cheers,
> David
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-SimpleStringConsumer-NPE-tp8888.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.