Kafka Producer - Null Pointer Exception when processing by element

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Producer - Null Pointer Exception when processing by element

earellano
I'm getting a NullPointerException when calling KakfaProducer010.processElement(StreamRecord<T>). Specifically, this comes from its helper function invokeInternally(), and the function's internalProducer not being configured properly, resulting in passing a null value to one its helper functions.

We'd really appreciate taking a look at below to see if this is a Flink bug or something we're doing wrong.

Our code

This is a simplified version of our program:
My code - part 1
My code - part 2

You can copy this code here to reproduce locally: https://pastebin.com/Li8iZuFj

Stack trace

Here is the stack trace:
Stack trace - Null Pointer Exception

What causes error in Flink code

The method processElement() calls invokeInternally(). Within invokeInternally(), Flink tries to parse variable values, e.g. topic name and partitions.

The app fails when trying to resolve the partitions. Specifically, the method to resolve the partitions has a parameter of KafkaProducer, which is passed as null, resulting in the NullPointerException. See the highlighted lines below of running the program in debugger view.
Flink code in debugger

So, I think the issue is that the internalProducer is not being setup correctly. Namely, it never sets the value for its producer field, so this stays null and then gets passed around, resulting in the Null Pointer Exception.

Bug? Or issue with our code?

My question to you all is if this is a bug that needs to be fixed, or if it results from us improperly configuring our program? The above code shows our configuration within the program itself (just setting bootstrap.servers), and we created the Kafka topic on our local machine as follows:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic process-elements-tests



Any help greatly appreciated! We're really hoping to get processElements() to work, because our streaming architecture requires working on individual elements rather than the entire data stream (sink behavior depends on the individual values within each record of our DataStream<List<T>>).

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

Tzu-Li (Gordon) Tai
Hi,

It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is there any specific reason why you want to emit elements to Kafka in a map function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream<String> someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, schema, props);

The processElement is invoked internally by the system, and isn’t intended to be invoked by user code.

See [1] for more details.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer

On 15 July 2017 at 3:35:32 AM, earellano ([hidden email]) wrote:

I'm getting a NullPointerException when calling
KakfaProducer010.processElement(StreamRecord<T>). Specifically, this comes
from its helper function invokeInternally(), and the function's
internalProducer not being configured properly, resulting in passing a null
value to one its helper functions.

We'd really appreciate taking a look at below to see if this is a Flink bug
or something we're doing wrong.

Our code

This is a simplified version of our program:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-1.png>
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-2.png>

You can copy this code here to reproduce locally:
https://pastebin.com/Li8iZuFj <https://pastebin.com/Li8iZuFj>

Stack trace

Here is the stack trace:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/stack-trace.png>

What causes error in Flink code

The method processElement() calls invokeInternally(). Within
invokeInternally(), Flink tries to parse variable values, e.g. topic name
and partitions.

The app fails when trying to resolve the partitions. Specifically, the
method to resolve the partitions has a parameter of KafkaProducer, which is
passed as null, resulting in the NullPointerException. See the highlighted
lines below of running the program in debugger view.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/flink-code-null-value.png>

So, I think the issue is that the internalProducer is not being setup
correctly. Namely, it never sets the value for its producer field, so this
stays null and then gets passed around, resulting in the Null Pointer
Exception.

Bug? Or issue with our code?

My question to you all is if this is a bug that needs to be fixed, or if it
results from us improperly configuring our program? The above code shows our
configuration within the program itself (just setting bootstrap.servers),
and we created the Kafka topic on our local machine as follows:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic process-elements-tests



Any help greatly appreciated! We're really hoping to get processElements()
to work, because our streaming architecture requires working on individual
elements rather than the entire data stream (sink behavior depends on the
individual values within each record of our DataStream<List&lt;T>>).





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

earellano
Tzu-Li (Gordon) Tai wrote
It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is there any specific reason why you want to emit elements to Kafka in a map function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream<String> someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, schema, props);
The reason we want to use processElement() & a map function, instead of someStream.addSink() is that our output logic has conditional depending on the type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream<byte []>
  parsed, producing a List of successful events and/or error events: DataStream<List<Events>>
  outputted conditionally, going to Kafka or Cassandra depending on which type of event it is.


This is our code for output logic (although modified types to not use our IP):

void output(DataStream<List<SuperclassEvent>> inputStream) {
    inputStream.map( eventList ->
      for (SuperclassEvent  event : eventList) {
         if (event instanceof SuccessEvent)
            emitToCassandra(event);
         else if (event instanceof ErrorEvent)
            emitToKafka(event);
       }
       return true;  // we don't actually want to return anything, just don't know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to iterate through our List<SuperclassElement> and conditionally send each individual record to its appropriate sink depending on its type.

I know Flink offers SplitStreams for a similar purpose, but this doesn't seem very ideal to us because it requires a new operator to first split the stream, and only after the whole List is processed can the records then be sent to their respective sinks. Whereas the code above sends the records to their sinks immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on individual records instead of the whole DataStream? Or are we misusing Flink? How do you recommend doing this the best way possible?


--

Also, if processElement() and invoke() aren't meant to be used, should they be made package private? Happy to make a pull request if so, although fear that might break a few things.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

Tzu-Li (Gordon) Tai
Hi,

void output(DataStream<List<SuperclassEvent>> inputStream) {

These seems odd. Are your events intended to be a list? If not, this should be a `DataStream<SuperclassEvent>`.

From the code snippet you’ve attached in the first post, it seems like you’ve initialized your source incorrectly.

`env.fromElements(List<...>)` will take the whole list as a single event, thus your source is only emitting a single list as a record. Perhaps what you actually want to do here is `env.fromCollection(List<...>)`?

This should also eliminate the situation that “only after the whole List is processed can the records then be sent to their respective sinks”, as you mentioned in your reply.

but this doesn't seem very ideal to us because it requires a new operator to first split the stream

IMO, this wouldn’t really introduce noticeable overhead, as the operator will be chained to the map operator. Side outputs is also the preferred way here, as side outputs subsume stream splitting.


Overall, I think it is reasonable to do a map -> split -> Kafka / Cassandra sinks in your case, given that you’ve declared the source correctly to be a single SuperclassEvent as a record.

The operator overhead is fairly trivial if it is chained. Another reason to use sinks properly is that only then will you benefit from the exactly-once / at-least-once delivery guarantees to external systems (which requires collaboration between the sink and Flink’s checkpointing).

Hope this helps!

Cheers,
Gordon


On 17 July 2017 at 2:59:38 AM, earellano [via Apache Flink User Mailing List archive.] ([hidden email]) wrote:

Tzu-Li (Gordon) Tai wrote
It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is there any specific reason why you want to emit elements to Kafka in a map function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream<String> someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, schema, props);
The reason we want to use processElement() & a map function, instead of someStream.addSink() is that our output logic has conditional depending on the type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream<byte []>
  parsed, producing a List of successful events and/or error events: DataStream<List<Events>>
  outputted conditionally, going to Kafka or Cassandra depending on which type of event it is.


This is our code for output logic (although modified types to not use our IP):

void output(DataStream<List<SuperclassEvent>> inputStream) {
    inputStream.map( eventList ->
      for (SuperclassEvent  event : eventList) {
         if (event instanceof SuccessEvent)
            emitToCassandra(event);
         else if (event instanceof ErrorEvent)
            emitToKafka(event);
       }
       return true;  // we don't actually want to return anything, just don't know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to iterate through our List<SuperclassElement> and conditionally send each individual record to its appropriate sink depending on its type.

I know Flink offers SplitStreams for a similar purpose, but this doesn't seem very ideal to us because it requires a new operator to first split the stream, and only after the whole List is processed can the records then be sent to their respective sinks. Whereas the code above sends the records to their sinks immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on individual records instead of the whole DataStream? Or are we misusing Flink? How do you recommend doing this the best way possible?


--

Also, if processElement() and invoke() aren't meant to be used, should they be made package private? Happy to make a pull request if so, although fear that might break a few things.


This email was sent by earellano (via Nabble)
To receive all replies by email, subscribe to this discussion
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

earellano
Hi,
Tzu-Li (Gordon) Tai wrote
These seems odd. Are your events intended to be a list? If not, this should be a `DataStream<SuperclassEvent>`.

From the code snippet you’ve attached in the first post, it seems like you’ve initialized your source incorrectly.

`env.fromElements(List<...>)` will take the whole list as a single event, thus your source is only emitting a single list as a record.
Ah sorry for the confusion. So the original code snippet isn't our actual code - it's a simplified and generified version so that it would be easy to reproduce the Null Pointer Exception without having to show our whole code base.

To clarify, our input actually uses a Kafka Consumer that reads a byte[], which is then passed to our external library parser which takes a byte[] and converts it into a List<Events>. This is why we have to use DataStream<List<Events>>, rather than just DataStream<Event>. It's a requirement from the parser we have to use, because each byte[] array record can create both a SuccessEvent(s) and/or ErrorEvent(s).

Our motivation for using the above map & for loop with conditional output logic was that we have to work with this whole List<Events> and not just individual Events, but don't want to wait for the whole list to be processed for the event at the beginning of the list to be outputted. For example, a byte[] record can return from our parser a List of 10 SuccessEvents and 1 ErrorEvent; we want to publish each Event immediately. Low latency is extremely important to us.

--

With task chaining as you're saying, could you help clarify how it works please? With each record of type List<Events> and calling the Split Operator followed by the sink operators, does that whole record/list have to be split before it can then go on to the sink? Or does task chaining mean it immediately gets outputted to the sink?


Thanks so much for all this help by the way!
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

Tzu-Li (Gordon) Tai
With task chaining as you're saying, could you help clarify how it works 
please?

Operator can be chained to be executed by a single task thread. See [1] for more details on that.

Basically, when two operators are chained together, the output of the first operator is immediately chained to the processElement of the next operator; it’s therefore just a consecutive invocation of processElements on the chained operators. There will be no thread-to-thread handover or buffering.

For example, a 
byte[] record can return from our parser a List of 10 SuccessEvents and 1 
ErrorEvent; we want to publish each Event immediately.

In that case, I would suggest using flatMap here, followed by chained splits and then sinks.

Using flatMap, you can collect elements as you iterate through the list element (i.e. `collector.collect(...)`). If the sinks are properly chained (which should be the case if there is no keyBy before the sink and you haven’t explicitly configured otherwise [2]), then for each .collect(...) the sink write will be invoked as part of the chain.

Effectively, this would then be writing to Kafka / Cassandra for every element as you iterate through that list (happening in the same thread since everything is chained), and matches what you have in mind.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains

On 17 July 2017 at 2:06:52 PM, earellano ([hidden email]) wrote:

Hi,

Tzu-Li (Gordon) Tai wrote

> These seems odd. Are your events intended to be a list? If not, this
> should be a `DataStream
> <SuperclassEvent>
> `.
>
> From the code snippet you’ve attached in the first post, it seems like
> you’ve initialized your source incorrectly.
>
> `env.fromElements(List<...>)` will take the whole list as a single event,
> thus your source is only emitting a single list as a record.

Ah sorry for the confusion. So the original code snippet isn't our actual
code - it's a simplified and generified version so that it would be easy to
reproduce the Null Pointer Exception without having to show our whole code
base.

To clarify, our input actually uses a Kafka Consumer that reads a byte[],
which is then passed to our external library parser which takes a byte[] and
converts it into a List<Events>. This is why we have to use
DataStream<List&lt;Events>>, rather than just DataStream<Event>. It's a
requirement from the parser we have to use, because each byte[] array record
can create both a SuccessEvent(s) and/or ErrorEvent(s).

Our motivation for using the above map & for loop with conditional output
logic was that we have to work with this whole List<Events> and not just
individual Events, but don't want to wait for the whole list to be processed
for the event at the beginning of the list to be outputted. For example, a
byte[] record can return from our parser a List of 10 SuccessEvents and 1
ErrorEvent; we want to publish each Event immediately. Low latency is
extremely important to us.

--

With task chaining as you're saying, could you help clarify how it works
please? With each record of type List<Events> and calling the Split Operator
followed by the sink operators, does that whole record/list have to be split
before it can then go on to the sink? Or does task chaining mean it
immediately gets outputted to the sink?


Thanks so much for all this help by the way!




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

earellano
Tzu-Li (Gordon) Tai wrote
Basically, when two operators are chained together, the output of the first operator is immediately chained to the processElement of the next operator; it’s therefore just a consecutive invocation of processElements on the chained operators. There will be no thread-to-thread handover or buffering.
Okay great, chaining tasks does sound like what we want then.


Tzu-Li (Gordon) Tai wrote
In that case, I would suggest using flatMap here, followed by chained splits and then sinks.
We changed our code to roughly follow this suggestion, but I'm not sure we're doing this correctly? Is there a better way you recommend chaining the tasks? As written below, are individual Events within the List being sent to their respective sinks right away, or does the whole list have to split first?
New code with splitStream()

We also had issues getting flatMap to work, and map seemed more appropriate. Our parser.parse() function has a one-to-one mapping between an input byte[] to a List<SuperclassEvent>, and that never changes, so a map seems to make sense to us. Were you suggesting a flatMap instead of map at this stage of calling our parser, or did you mean to use a flatMap() after the parser and before the split()?
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer - Null Pointer Exception when processing by element

Tzu-Li (Gordon) Tai
Our parser.parse() function has a one-to-one mapping between an input byte[] 
to a List<SuperclassEvent>

Ideally, this should be handled within the KeyedDeserializationSchema passed to your Kafka consumer. That would then avoid the need of an extra “parser map function” after the source.

Were you suggesting a flatMap instead of map at this stage of 
calling our parser, or did you mean to use a flatMap() after the parser and 
before the split()? 

I meant a flatMap after the parser (whether it’s done as a map function or within the Kafka source) and before the split. The flatMap function iterates through your per-record lists and collects as it iterates through them.

- Gordon




On 18 July 2017 at 3:02:45 AM, earellano ([hidden email]) wrote:

Tzu-Li (Gordon) Tai wrote
> Basically, when two operators are chained together, the output of the
> first operator is immediately chained to the processElement of the next
> operator; it’s therefore just a consecutive invocation of processElements
> on the chained operators. There will be no thread-to-thread handover or
> buffering.

Okay great, chaining tasks does sound like what we want then.



Tzu-Li (Gordon) Tai wrote
> In that case, I would suggest using flatMap here, followed by chained
> splits and then sinks.

We changed our code to roughly follow this suggestion, but I'm not sure
we're doing this correctly? Is there a better way you recommend chaining the
tasks? As written below, are individual Events within the List being sent to
their respective sinks right away, or does the whole list have to split
first?
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14312/split-stream.png>

We also had issues getting flatMap to work, and map seemed more appropriate.
Our parser.parse() function has a one-to-one mapping between an input byte[]
to a List<SuperclassEvent>, and that never changes, so a map seems to make
sense to us. Were you suggesting a flatMap instead of map at this stage of
calling our parser, or did you mean to use a flatMap() after the parser and
before the split()?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.