Serialization problem for Guava's TreeMultimap

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization problem for Guava's TreeMultimap

Yukun Guo
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
@Override
public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
Tuple2<String, Long> itemCount) throws Exception {
String item = itemCount.f0;
Long count = itemCount.f1;
topKSoFar.put(count, item);
if (topKSoFar.keySet().size() > topK) {
topKSoFar.removeAll(topKSoFar.keySet().first());
}
return topKSoFar;
}
});

The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo serialization in Flink?


Best regards,
Yukun

Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem for Guava's TreeMultimap

Stephan Ewen
Hi!

Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <[hidden email]> wrote:
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
@Override
public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
Tuple2<String, Long> itemCount) throws Exception {
String item = itemCount.f0;
Long count = itemCount.f1;
topKSoFar.put(count, item);
if (topKSoFar.keySet().size() > topK) {
topKSoFar.removeAll(topKSoFar.keySet().first());
}
return topKSoFar;
}
});

The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:<a href="tel:1685" value="+491685" target="_blank">1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo serialization in Flink?


Best regards,
Yukun


Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem for Guava's TreeMultimap

Yukun Guo
Hi,

The same error occurs after changing the code, unfortunately.

BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T serializer` where T extends Serializer<?> & Serializable, so I pass a custom GenericJavaSerializer<T>, but I guess this doesn't matter much.


On 19 September 2016 at 18:02, Stephan Ewen <[hidden email]> wrote:
Hi!

Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <[hidden email]> wrote:
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
@Override
public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
Tuple2<String, Long> itemCount) throws Exception {
String item = itemCount.f0;
Long count = itemCount.f1;
topKSoFar.put(count, item);
if (topKSoFar.keySet().size() > topK) {
topKSoFar.removeAll(topKSoFar.keySet().first());
}
return topKSoFar;
}
});

The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:<a href="tel:1685" value="+491685" target="_blank">1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo serialization in Flink?


Best regards,
Yukun



Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem for Guava's TreeMultimap

Yukun Guo
Some detail: if running the FoldFunction on a KeyedStream, everything works fine. So it must relate to the way WindowedStream handles type extraction.

In case any Flink experts would like to reproduce it, I have created a repo on Github: github.com/gyk/flink-multimap

On 20 September 2016 at 10:33, Yukun Guo <[hidden email]> wrote:
Hi,

The same error occurs after changing the code, unfortunately.

BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T serializer` where T extends Serializer<?> & Serializable, so I pass a custom GenericJavaSerializer<T>, but I guess this doesn't matter much.


On 19 September 2016 at 18:02, Stephan Ewen <[hidden email]> wrote:
Hi!

Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <[hidden email]> wrote:
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
@Override
public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
Tuple2<String, Long> itemCount) throws Exception {
String item = itemCount.f0;
Long count = itemCount.f1;
topKSoFar.put(count, item);
if (topKSoFar.keySet().size() > topK) {
topKSoFar.removeAll(topKSoFar.keySet().first());
}
return topKSoFar;
}
});

The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:<a href="tel:1685" value="+491685" target="_blank">1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo serialization in Flink?


Best regards,
Yukun




Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem for Guava's TreeMultimap

Fabian Hueske-2
Hi Yukun,

I debugged this issue and found that this is a bug in the serialization of the StateDescriptor.
I have created FLINK-4640 [1] to resolve the issue.

Thanks for reporting the issue.

Best, Fabian

2016-09-20 10:35 GMT+02:00 Yukun Guo <[hidden email]>:
Some detail: if running the FoldFunction on a KeyedStream, everything works fine. So it must relate to the way WindowedStream handles type extraction.

In case any Flink experts would like to reproduce it, I have created a repo on Github: github.com/gyk/flink-multimap

On 20 September 2016 at 10:33, Yukun Guo <[hidden email]> wrote:
Hi,

The same error occurs after changing the code, unfortunately.

BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T serializer` where T extends Serializer<?> & Serializable, so I pass a custom GenericJavaSerializer<T>, but I guess this doesn't matter much.


On 19 September 2016 at 18:02, Stephan Ewen <[hidden email]> wrote:
Hi!

Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <[hidden email]> wrote:
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
@Override
public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
Tuple2<String, Long> itemCount) throws Exception {
String item = itemCount.f0;
Long count = itemCount.f1;
topKSoFar.put(count, item);
if (topKSoFar.keySet().size() > topK) {
topKSoFar.removeAll(topKSoFar.keySet().first());
}
return topKSoFar;
}
});

The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:<a href="tel:1685" value="+491685" target="_blank">1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo serialization in Flink?


Best regards,
Yukun





Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem for Guava's TreeMultimap

Yukun Guo
Thank you for quickly fixing it!


On 20 September 2016 at 17:17, Fabian Hueske <[hidden email]> wrote:
Hi Yukun,

I debugged this issue and found that this is a bug in the serialization of the StateDescriptor.
I have created FLINK-4640 [1] to resolve the issue.

Thanks for reporting the issue.

Best, Fabian

2016-09-20 10:35 GMT+02:00 Yukun Guo <[hidden email]>:
Some detail: if running the FoldFunction on a KeyedStream, everything works fine. So it must relate to the way WindowedStream handles type extraction.

In case any Flink experts would like to reproduce it, I have created a repo on Github: github.com/gyk/flink-multimap

On 20 September 2016 at 10:33, Yukun Guo <[hidden email]> wrote:
Hi,

The same error occurs after changing the code, unfortunately.

BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T serializer` where T extends Serializer<?> & Serializable, so I pass a custom GenericJavaSerializer<T>, but I guess this doesn't matter much.


On 19 September 2016 at 18:02, Stephan Ewen <[hidden email]> wrote:
Hi!

Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <[hidden email]> wrote:
Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
@Override
public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
Tuple2<String, Long> itemCount) throws Exception {
String item = itemCount.f0;
Long count = itemCount.f1;
topKSoFar.put(count, item);
if (topKSoFar.keySet().size() > topK) {
topKSoFar.removeAll(topKSoFar.keySet().first());
}
return topKSoFar;
}
});

The problem is when fold function getting called, the initial value has lost therefore it encounters a NullPointerException. This is due to failed type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:<a href="tel:1685" value="+491685" target="_blank">1685 - No fields detected for class com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, new TreeMultimapSerializer()`. The write/read methods are almost line-by-line translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall back to use the standard Java serialization. Since Kryo's JavaSerializer itself is not serializable, I wrote an adapter to make it fit the "addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo serialization in Flink?


Best regards,
Yukun