Null Pointer Exception on Trying to read a message from Kafka

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Null Pointer Exception on Trying to read a message from Kafka

Sridhar Chellappa
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Ted Yu
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Sridhar Chellappa
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message
            env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) {
        super(
                Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <[hidden email]> wrote:
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Ted Yu
Which Flink version are you using (so that line numbers can be matched with source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <[hidden email]> wrote:
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message
            env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) {
        super(
                Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <[hidden email]> wrote:
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)



Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Sridhar Chellappa
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <[hidden email]> wrote:
Which Flink version are you using (so that line numbers can be matched with source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <[hidden email]> wrote:
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message
            env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) {
        super(
                Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <[hidden email]> wrote:
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)




Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Sridhar Chellappa
Kafka Version is 0.10.0

On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <[hidden email]> wrote:
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <[hidden email]> wrote:
Which Flink version are you using (so that line numbers can be matched with source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <[hidden email]> wrote:
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message
            env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) {
        super(
                Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <[hidden email]> wrote:
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)





Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Ted Yu
The NPE came from this line:

        StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));

Either serializer or castRecord was null.

I wonder if this has been fixed in 1.3.2 release.

On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <[hidden email]> wrote:
Kafka Version is 0.10.0

On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <[hidden email]> wrote:
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <[hidden email]> wrote:
Which Flink version are you using (so that line numbers can be matched with source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <[hidden email]> wrote:
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message
            env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) {
        super(
                Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <[hidden email]> wrote:
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)






Reply | Threaded
Open this post in threaded view
|

Re: Null Pointer Exception on Trying to read a message from Kafka

Sridhar Chellappa
OK. I got past the problem. Basically, I had to change

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return TypeExtractor.getForClass(MyKafkaMessage.class);; -------------------------> Add Type Info
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return element.byteArray();                                                 --------------------------> modify serializer
    }
}


When I run my program, I get another exception :


java.lang.NullPointerException
	at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
	at java.util.AbstractList.add(AbstractList.java:108)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)

On Tue, Aug 29, 2017 at 8:43 AM, Ted Yu <[hidden email]> wrote:
The NPE came from this line:

        StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));

Either serializer or castRecord was null.

I wonder if this has been fixed in 1.3.2 release.

On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa <[hidden email]> wrote:
Kafka Version is 0.10.0

On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <[hidden email]> wrote:
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <[hidden email]> wrote:
Which Flink version are you using (so that line numbers can be matched with source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <[hidden email]> wrote:
DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource(
                getStreamSource(env, parameterTool);
        );



        public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) {

           // MyKAfkaMessage is a ProtoBuf message
            env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

            KafkaDataSource<MyKafkaMessage> flinkCepConsumer =
                    new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema());

            return flinkCepConsumer;
        }


public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> {

    public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) {
        super(
                Arrays.asList(parameterTool.getRequired("topic").split(",")),
                deserializer,
                parameterTool.getProperties()
        );

    }

}

public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> {

    @Override
    public MyKafkaMessage deserialize(byte[] message) throws IOException {
        MyKafkaMessage MyKafkaMessage = null;
        try {
            MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return MyKafkaMessage;
        }
    }

    @Override
    public boolean isEndOfStream(MyKafkaMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<MyKafkaMessage> getProducedType() {
        return null;
    }

    @Override
    public byte[] serialize(MyKafkaMessage element) {
        return new byte[0];
    }
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <[hidden email]> wrote:
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <[hidden email]> wrote:
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try to create a DataStream from the KafkConsumer (env.addSource()) I get the following exception :

Any idea on how can this happen?

java.lang.NullPointerException
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)