NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

Vijay Balakrishnan
HI,
I have a class defined :

public class MGroupingWindowAggregate implements AggregateFunction.. {
private final Map<String, Object> keyHistMap = new TreeMap<>();
}
In the constructor, I initialize it.
public MGroupingWindowAggregate() {
Histogram minHist = new Histogram(new SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
org.apache.flink.metrics.Histogram minHistogram = new DropwizardHistogramWrapper(minHist);
Map<String, org.apache.flink.metrics.Histogram> intervalHistMap = new TreeMap<>();
intervalHistMap.putIfAbsent(interval, minHistogram);
keyHistMap.putIfAbsent(operationKey, intervalHistMap);
}
When trying to use it in the add() method of AggregateFunction, it fails saying:
NotSerializableException: org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper

Tried to wrap DropwizardHistogramWrapper inside a serializable Object with Composition but that also didn't work.

Looked at using RichFunction open() based on Stephan's advise here. https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
But cannot use RichFunction with AggrgeateFunction or use RichAggregateFunction

How can I use the DropwizardHistogramWrapper -a non serializable class inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to get some Histogram percentile stats without re-inventing the wheel.

TIA,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

Fabian Hueske-2
Hi,

There are two ways:

1. make the non-serializable member variable transient (meaning that it won't be serialized) and check in the aggregate call if it has been initialized or not.
2. implement your own serialization logic by overriding readObject() and writeObject() [1].

Best, Fabian


Am Do., 6. Juni 2019 um 23:04 Uhr schrieb Vijay Balakrishnan <[hidden email]>:
HI,
I have a class defined :

public class MGroupingWindowAggregate implements AggregateFunction.. {
private final Map<String, Object> keyHistMap = new TreeMap<>();
}
In the constructor, I initialize it.
public MGroupingWindowAggregate() {
Histogram minHist = new Histogram(new SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
org.apache.flink.metrics.Histogram minHistogram = new DropwizardHistogramWrapper(minHist);
Map<String, org.apache.flink.metrics.Histogram> intervalHistMap = new TreeMap<>();
intervalHistMap.putIfAbsent(interval, minHistogram);
keyHistMap.putIfAbsent(operationKey, intervalHistMap);
}
When trying to use it in the add() method of AggregateFunction, it fails saying:
NotSerializableException: org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper

Tried to wrap DropwizardHistogramWrapper inside a serializable Object with Composition but that also didn't work.

Looked at using RichFunction open() based on Stephan's advise here. https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
But cannot use RichFunction with AggrgeateFunction or use RichAggregateFunction

How can I use the DropwizardHistogramWrapper -a non serializable class inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to get some Histogram percentile stats without re-inventing the wheel.

TIA,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

Vijay Balakrishnan
Thanks,Fabian.
I got around the issue by moving the logic for the DropwizardHistogramWrapper -a non serializable class into the ProcessWindowFunction's open() function.



On Fri, Jun 7, 2019 at 12:33 AM Fabian Hueske <[hidden email]> wrote:
Hi,

There are two ways:

1. make the non-serializable member variable transient (meaning that it won't be serialized) and check in the aggregate call if it has been initialized or not.
2. implement your own serialization logic by overriding readObject() and writeObject() [1].

Best, Fabian


Am Do., 6. Juni 2019 um 23:04 Uhr schrieb Vijay Balakrishnan <[hidden email]>:
HI,
I have a class defined :

public class MGroupingWindowAggregate implements AggregateFunction.. {
private final Map<String, Object> keyHistMap = new TreeMap<>();
}
In the constructor, I initialize it.
public MGroupingWindowAggregate() {
Histogram minHist = new Histogram(new SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
org.apache.flink.metrics.Histogram minHistogram = new DropwizardHistogramWrapper(minHist);
Map<String, org.apache.flink.metrics.Histogram> intervalHistMap = new TreeMap<>();
intervalHistMap.putIfAbsent(interval, minHistogram);
keyHistMap.putIfAbsent(operationKey, intervalHistMap);
}
When trying to use it in the add() method of AggregateFunction, it fails saying:
NotSerializableException: org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper

Tried to wrap DropwizardHistogramWrapper inside a serializable Object with Composition but that also didn't work.

Looked at using RichFunction open() based on Stephan's advise here. https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
But cannot use RichFunction with AggrgeateFunction or use RichAggregateFunction

How can I use the DropwizardHistogramWrapper -a non serializable class inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to get some Histogram percentile stats without re-inventing the wheel.

TIA,
Vijay