serialization error when using multiple metrics counters

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

serialization error when using multiple metrics counters

Colin Williams
I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
Reply | Threaded
Open this post in threaded view
|

Re: serialization error when using multiple metrics counters

Kostas Kloudas
Hi Colin,

Are you initializing your counters from within the open() method of you rich function?
In other words, are you calling 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”

from within the open().

The counter interface is not serializable. So if you instantiate the counters outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the exception.

You can have a look at the docs for an example:

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams <[hidden email]> wrote:

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...

Reply | Threaded
Open this post in threaded view
|

Re: serialization error when using multiple metrics counters

Stephan Ewen
Interesting, is there a quirk in Scala that using multiple lazy variables results possibly in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Colin,

Are you initializing your counters from within the open() method of you rich function?
In other words, are you calling 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”

from within the open().

The counter interface is not serializable. So if you instantiate the counters outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the exception.

You can have a look at the docs for an example:

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams <[hidden email]> wrote:

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...


Reply | Threaded
Open this post in threaded view
|

Re: serialization error when using multiple metrics counters

swiesman

A scala class contains a single lazy val it is implemented using a boolean flag to track if the field has been evaluated. When a class contains, multiple lazy val’s it is implemented as a bit mask shared amongst the variables. This can lead to inconsistencies as to whether serialization forces evaluation of the field, in general lazy val’s should always be marked @transient for expected behavior.

 

Seth

 

From: Stephan Ewen <[hidden email]>
Date: Monday, October 9, 2017 at 2:44 PM
To: Kostas Kloudas <[hidden email]>
Cc: Colin Williams <[hidden email]>, user <[hidden email]>
Subject: Re: serialization error when using multiple metrics counters

 

Interesting, is there a quirk in Scala that using multiple lazy variables results possibly in eager initialization of some?

 

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Colin,

 

Are you initializing your counters from within the open() method of you rich function?

In other words, are you calling 

 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”) 

 

from within the open().

 

The counter interface is not serializable. So if you instantiate the counters outside the open(),

when Flink tries to ship your code to the cluster, it cannot so you get the exception.

 

You can have a look at the docs for an example:

 

Thanks,

Kostas

 

On Oct 7, 2017, at 11:34 PM, Colin Williams <[hidden email]> wrote:

 

I've created a RichMapFunction in scala with multiple counters like:

 

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")

   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")

   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

 

which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.

 

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?

 

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)

[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)

[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)

[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)

[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)

[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)

[info]   ...

[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   ...

[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)

[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)

[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)

[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)

[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)

[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)

[info]   ...

[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   ...

 

 

Reply | Threaded
Open this post in threaded view
|

Re: serialization error when using multiple metrics counters

Colin Williams
Thanks everyone, and thank you very much Seth! Adding @transient to the lazy vals is what I needed.

On Mon, Oct 9, 2017 at 1:34 PM, Seth Wiesman <[hidden email]> wrote:

A scala class contains a single lazy val it is implemented using a boolean flag to track if the field has been evaluated. When a class contains, multiple lazy val’s it is implemented as a bit mask shared amongst the variables. This can lead to inconsistencies as to whether serialization forces evaluation of the field, in general lazy val’s should always be marked @transient for expected behavior.

 

Seth

 

From: Stephan Ewen <[hidden email]>
Date: Monday, October 9, 2017 at 2:44 PM
To: Kostas Kloudas <[hidden email]>
Cc: Colin Williams <[hidden email]>, user <[hidden email]>
Subject: Re: serialization error when using multiple metrics counters

 

Interesting, is there a quirk in Scala that using multiple lazy variables results possibly in eager initialization of some?

 

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Colin,

 

Are you initializing your counters from within the open() method of you rich function?

In other words, are you calling 

 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”) 

 

from within the open().

 

The counter interface is not serializable. So if you instantiate the counters outside the open(),

when Flink tries to ship your code to the cluster, it cannot so you get the exception.

 

You can have a look at the docs for an example:

 

Thanks,

Kostas

 

On Oct 7, 2017, at 11:34 PM, Colin Williams <[hidden email]> wrote:

 

I've created a RichMapFunction in scala with multiple counters like:

 

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")

   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")

   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

 

which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.

 

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?

 

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)

[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)

[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)

[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)

[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)

[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)

[info]   ...

[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   ...

[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)

[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)

[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)

[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)

[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)

[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)

[info]   ...

[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)

[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)

[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)

[info]   ...