Serialization questions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization questions

Flavio Pompermaier
Hi to all,
I was trying to check whether our jobs are properly typed or not.
I've started disabling generic types[1] in order to discover untyped transformations and so I added the proper returns() to operators.

Unfortunately there are jobs where we serialize Thrift and DateTime objects, so I need to properly configure the serializers in the ExecutionEnvironment:

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, TBaseSerializer.class);

Those jobs don't work when I disable generic types and I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a generic type.

 I have a couple of questions:
  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc...
  • how to avoid that exception?
Best,
Flavio

[1] env.getConfig().disableGenericTypes();
Reply | Threaded
Open this post in threaded view
|

Re: Serialization questions

zhangminglei
Hi, Flavio

  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc…

I think it is not exactly a problem with flink. Instead of a kryo problem. For example, addDefaultKryoSerializer corresponding to the addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas registerTypeWithKryoSerializer corresponding to the register(int.class, new IntSerializer()) in kryo.With register, you explicitly assign an id for that type plus serializer. The default serializer just tells kryo which serializer to use when this type has to be serialized, kryo will then implicitly register the serializer. And the advantage of using register would be [1]. when setting setRegistrationRequired(true), which is recommended (and will be the default in 5.0), you'd have to register every occurring type explicitly.

  • how to avoid that exception?
You can try below and do not make disableGenericTypes and see what happens.

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);



Cheers
Minglei






在 2018年7月17日,下午9:00,Flavio Pompermaier <[hidden email]> 写道:

Hi to all,
I was trying to check whether our jobs are properly typed or not.
I've started disabling generic types[1] in order to discover untyped transformations and so I added the proper returns() to operators.

Unfortunately there are jobs where we serialize Thrift and DateTime objects, so I need to properly configure the serializers in the ExecutionEnvironment:

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, TBaseSerializer.class);

Those jobs don't work when I disable generic types and I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a generic type.

 I have a couple of questions:
  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc...
  • how to avoid that exception?
Best,
Flavio

[1] env.getConfig().disableGenericTypes();

Reply | Threaded
Open this post in threaded view
|

Re: Serialization questions

Flavio Pompermaier
Hi Minglei,
using the registerTypeWithKryoSerializer with the 3 classes works (without disableGenericTypes) but the problem is that I would like to avoid Kryo serialization if this is useful to speedup the job performance,
and thus I'd like to be able to run all jobs with disableGenericTypes. 

Best,
Flavio

On Wed, Jul 18, 2018 at 11:10 AM, zhangminglei <[hidden email]> wrote:
Hi, Flavio

  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc…

I think it is not exactly a problem with flink. Instead of a kryo problem. For example, addDefaultKryoSerializer corresponding to the addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas registerTypeWithKryoSerializer corresponding to the register(int.class, new IntSerializer()) in kryo.With register, you explicitly assign an id for that type plus serializer. The default serializer just tells kryo which serializer to use when this type has to be serialized, kryo will then implicitly register the serializer. And the advantage of using register would be [1]. when setting setRegistrationRequired(true), which is recommended (and will be the default in 5.0), you'd have to register every occurring type explicitly.

  • how to avoid that exception?
You can try below and do not make disableGenericTypes and see what happens.

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);



Cheers
Minglei






在 2018年7月17日,下午9:00,Flavio Pompermaier <[hidden email]> 写道:

Hi to all,
I was trying to check whether our jobs are properly typed or not.
I've started disabling generic types[1] in order to discover untyped transformations and so I added the proper returns() to operators.

Unfortunately there are jobs where we serialize Thrift and DateTime objects, so I need to properly configure the serializers in the ExecutionEnvironment:

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, TBaseSerializer.class);

Those jobs don't work when I disable generic types and I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a generic type.

 I have a couple of questions:
  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc...
  • how to avoid that exception?
Best,
Flavio

[1] env.getConfig().disableGenericTypes();



Reply | Threaded
Open this post in threaded view
|

Re: Serialization questions

Andrey Zagrebin
Hi Flavio,

According to the current implementation of `disableGenericTypes`, the exception you get should be valid because Kryo still has to be used for `EntitonAtom` which might be classified as generic (non-serialisable by Flink). You cannot specify exceptions for this check at the moment.

If you want to have control for which classes Kryo can be used and still activate `disableGenericTypes`, currently you can create your own type info where you provide your own Flink serialiser (extends TypeSerializer) which can internally use Kryo (KryoSerializer).
You can do it using class annotation:
This way Flink should not treat it as generic a type.

Cheers,
Andrey

On 18 Jul 2018, at 11:26, Flavio Pompermaier <[hidden email]> wrote:

Hi Minglei,
using the registerTypeWithKryoSerializer with the 3 classes works (without disableGenericTypes) but the problem is that I would like to avoid Kryo serialization if this is useful to speedup the job performance,
and thus I'd like to be able to run all jobs with disableGenericTypes. 

Best,
Flavio

On Wed, Jul 18, 2018 at 11:10 AM, zhangminglei <[hidden email]> wrote:
Hi, Flavio

  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc…

I think it is not exactly a problem with flink. Instead of a kryo problem. For example, addDefaultKryoSerializer corresponding to the addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas registerTypeWithKryoSerializer corresponding to the register(int.class, new IntSerializer()) in kryo.With register, you explicitly assign an id for that type plus serializer. The default serializer just tells kryo which serializer to use when this type has to be serialized, kryo will then implicitly register the serializer. And the advantage of using register would be [1]. when setting setRegistrationRequired(true), which is recommended (and will be the default in 5.0), you'd have to register every occurring type explicitly.

  • how to avoid that exception?
You can try below and do not make disableGenericTypes and see what happens.

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);



Cheers
Minglei






在 2018年7月17日,下午9:00,Flavio Pompermaier <[hidden email]> 写道:

Hi to all,
I was trying to check whether our jobs are properly typed or not.
I've started disabling generic types[1] in order to discover untyped transformations and so I added the proper returns() to operators.

Unfortunately there are jobs where we serialize Thrift and DateTime objects, so I need to properly configure the serializers in the ExecutionEnvironment:

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, TBaseSerializer.class);

Those jobs don't work when I disable generic types and I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a generic type.

 I have a couple of questions:
  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc...
  • how to avoid that exception?
Best,
Flavio

[1] env.getConfig().disableGenericTypes();




Reply | Threaded
Open this post in threaded view
|

Re: Serialization questions

Andrey Zagrebin
Another way would be also to make `EntitonAtom` extend `org.apache.flink.types.Value` and implement `IOReadableWritable` using custom (Kryo) serialisation.

On 18 Jul 2018, at 18:27, Andrey Zagrebin <[hidden email]> wrote:

Hi Flavio,

According to the current implementation of `disableGenericTypes`, the exception you get should be valid because Kryo still has to be used for `EntitonAtom` which might be classified as generic (non-serialisable by Flink). You cannot specify exceptions for this check at the moment.

If you want to have control for which classes Kryo can be used and still activate `disableGenericTypes`, currently you can create your own type info where you provide your own Flink serialiser (extends TypeSerializer) which can internally use Kryo (KryoSerializer).
You can do it using class annotation:
This way Flink should not treat it as generic a type.

Cheers,
Andrey

On 18 Jul 2018, at 11:26, Flavio Pompermaier <[hidden email]> wrote:

Hi Minglei,
using the registerTypeWithKryoSerializer with the 3 classes works (without disableGenericTypes) but the problem is that I would like to avoid Kryo serialization if this is useful to speedup the job performance,
and thus I'd like to be able to run all jobs with disableGenericTypes. 

Best,
Flavio

On Wed, Jul 18, 2018 at 11:10 AM, zhangminglei <[hidden email]> wrote:
Hi, Flavio

  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc…

I think it is not exactly a problem with flink. Instead of a kryo problem. For example, addDefaultKryoSerializer corresponding to the addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas registerTypeWithKryoSerializer corresponding to the register(int.class, new IntSerializer()) in kryo.With register, you explicitly assign an id for that type plus serializer. The default serializer just tells kryo which serializer to use when this type has to be serialized, kryo will then implicitly register the serializer. And the advantage of using register would be [1]. when setting setRegistrationRequired(true), which is recommended (and will be the default in 5.0), you'd have to register every occurring type explicitly.

  • how to avoid that exception?
You can try below and do not make disableGenericTypes and see what happens.

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);



Cheers
Minglei






在 2018年7月17日,下午9:00,Flavio Pompermaier <[hidden email]> 写道:

Hi to all,
I was trying to check whether our jobs are properly typed or not.
I've started disabling generic types[1] in order to discover untyped transformations and so I added the proper returns() to operators.

Unfortunately there are jobs where we serialize Thrift and DateTime objects, so I need to properly configure the serializers in the ExecutionEnvironment:

env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, TBaseSerializer.class);

Those jobs don't work when I disable generic types and I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a generic type.

 I have a couple of questions:
  • addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because addDefaultKryoSerializer use the passed serializer also for subclasses of the configured class. Am I right? This is not very clear in the method's Javadoc...
  • how to avoid that exception?
Best,
Flavio

[1] env.getConfig().disableGenericTypes();