KafkaSource Problem

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

KafkaSource Problem

Bobby Richard
I'm receiving the following exception when trying to use a KafkaSource from the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}



I'm setting the deserializer using the ValueDeserializerWrapper as described in the KafkaSourceBuilder javadoc example https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the deserializer isn't actually set until the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException. What am I missing?

Thanks,
Bobby

This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.

smime.p7s (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: KafkaSource Problem

Till Rohrmann
Hi Bobby,

This is most likely a bug in Flink. Thanks a lot for reporting the issue and analyzing it. I have created an issue for tracking it [1].

cc Becket.


Cheers,
Till

On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <[hidden email]> wrote:
I'm receiving the following exception when trying to use a KafkaSource from the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}



I'm setting the deserializer using the ValueDeserializerWrapper as described in the KafkaSourceBuilder javadoc example https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the deserializer isn't actually set until the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException. What am I missing?

Thanks,
Bobby

This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.
Reply | Threaded
Open this post in threaded view
|

Re: KafkaSource Problem

Bobby Richard
Great thanks, I was able to work around the issue by implementing my own KafkaRecordDeserializer. I will take a stab at a PR to fix the bug, should be an easy fix.

On Tue, Mar 9, 2021 at 9:26 AM Till Rohrmann <[hidden email]> wrote:
Hi Bobby,

This is most likely a bug in Flink. Thanks a lot for reporting the issue and analyzing it. I have created an issue for tracking it [1].

cc Becket.


Cheers,
Till

On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <[hidden email]> wrote:
I'm receiving the following exception when trying to use a KafkaSource from the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}



I'm setting the deserializer using the ValueDeserializerWrapper as described in the KafkaSourceBuilder javadoc example https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the deserializer isn't actually set until the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException. What am I missing?

Thanks,
Bobby

This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.


--

Bobby Richard
R&D Software Engineer   | Information Security Group   | Symantec Enterprise Division
Broadcom

mobile: 337.794.2128

Atlanta, GA (USA)
[hidden email]   | broadcom.com


This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.

smime.p7s (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: KafkaSource Problem

Till Rohrmann
Great to hear. Yes, if you can help fix this issue that would be great.

Cheers,
Till

On Tue, Mar 9, 2021 at 3:41 PM Bobby Richard <[hidden email]> wrote:
Great thanks, I was able to work around the issue by implementing my own KafkaRecordDeserializer. I will take a stab at a PR to fix the bug, should be an easy fix.

On Tue, Mar 9, 2021 at 9:26 AM Till Rohrmann <[hidden email]> wrote:
Hi Bobby,

This is most likely a bug in Flink. Thanks a lot for reporting the issue and analyzing it. I have created an issue for tracking it [1].

cc Becket.


Cheers,
Till

On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <[hidden email]> wrote:
I'm receiving the following exception when trying to use a KafkaSource from the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}



I'm setting the deserializer using the ValueDeserializerWrapper as described in the KafkaSourceBuilder javadoc example https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the deserializer isn't actually set until the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException. What am I missing?

Thanks,
Bobby

This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.


--

Bobby Richard
R&D Software Engineer   | Information Security Group   | Symantec Enterprise Division
Broadcom

mobile: 337.794.2128

Atlanta, GA (USA)
[hidden email]   | broadcom.com


This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.