Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

sohimankotia
I have a class extending FlinkKafkaProducer010 . In Invoke method I am using this.getRuntimeContext to increment flink counter .

But I am getting null value of RuntimeContext which is leading to NullPointerException .

Using Flink 1.2
Reply | Threaded
Open this post in threaded view
|

Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

Tzu-Li (Gordon) Tai
Hi,

How are you using your extended sink?

The runtime context is provided to the UDF by the system when the job is executed.
So, for example, if you’re just testing out your UDF in unit tests, `getRuntimeContext` would return null (simply because it isn’t set). In such cases, you would need to inject a mock runtime context to perform the test.

Does this explain your case?

Cheers,
Gordon

On 17 April 2017 at 6:45:31 PM, sohimankotia ([hidden email]) wrote:

I have a class extending FlinkKafkaProducer010 . In Invoke method I am using
this.getRuntimeContext to increment flink counter .

But I am getting null value of RuntimeContext which is leading to
NullPointerException .

Using Flink 1.2



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-tp12633.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

sohimankotia
Thanks for Reply Gordon . I am not doing any unit test . Its production code .

public class MyClass extends FlinkKafkaProducer010<BasicInfoTuple>

and then

incrementCounter("items-not-found", this.getRuntimeContext());
Reply | Threaded
Open this post in threaded view
|

Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

Aljoscha Krettek
The reason for this is that FlinkKafkaProducer010 is a peculiar hybrid of a StreamSink (a subclass of StreamOperator) and SinkFunction/RichFunction (the interface for user functions). The runtime context of this class is only set when you use it as an operator, which happens when you use FlinkKafkaProducer010.writeToKafkaWithTimestamps(). When used as a SinkFunction, i.e. DataStream.addSink() this doesn’t work.

I created an issue for resolving this in a general way: https://issues.apache.org/jira/browse/FLINK-6323

> On 17. Apr 2017, at 13:15, sohimankotia <[hidden email]> wrote:
>
> Thanks for Reply Gordon . I am not doing any unit test . Its production code
> .
>
> public class MyClass extends FlinkKafkaProducer010<BasicInfoTuple>
>
> and then
>
> incrementCounter("items-not-found", this.getRuntimeContext());
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-tp12633p12635.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

sohimankotia
Thanks Aljoscha for reply .

Is there any way where I can add custom metrics or counter  in FlinkKafkaProducer010 ?

Reply | Threaded
Open this post in threaded view
|

Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

Aljoscha Krettek
I’m afraid you would have to copy the code and to the edits until we resolve the issue.

> On 19. Apr 2017, at 06:38, sohimankotia <[hidden email]> wrote:
>
> Thanks Aljoscha for reply .
>
> Is there any way where I can add custom metrics or counter  in
> FlinkKafkaProducer010 ?
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-tp12633p12668.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.