Correctly serializing "Number" as state in ProcessFunction

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Correctly serializing "Number" as state in ProcessFunction

Miguel Araújo-2
Hi everyone,

I have a ProcessFunction which needs to store different number types for different keys, e.g., some keys need to store an integer while others need to store a double.

I tried to use java.lang.Number as the type for the ValueState, but I got the expected "No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType." 

I have the feeling that this is not the right approach, but the exact type to be stored is only known at runtime which makes things a bit trickier. Is there a way to register these classes correctly, or Is it preferable to use different ValueState's for different types?

Thanks,
Miguel
Reply | Threaded
Open this post in threaded view
|

Re: Correctly serializing "Number" as state in ProcessFunction

Klemens Muthmann
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it quick and dirty you could implement a class such as:

public class Value {
    private boolean isLongSet = false;
    private long longValue = 0L;
    private boolean isIntegerSet = false;
    private int intValue = 0;

   public Value(final long value) {
       setLong(value);
   }

    public void setLong(final long value) |
        longValue = value;
        isLongSet = true;
   }

   public long getLong() {
       if(isLongSet) {
           return longValue
       }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as explained here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
      Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo <[hidden email]>:
>
> Hi everyone,
>
> I have a ProcessFunction which needs to store different number types for different keys, e.g., some keys need to store an integer while others need to store a double.
>
> I tried to use java.lang.Number as the type for the ValueState, but I got the expected "No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType."
>
> I have the feeling that this is not the right approach, but the exact type to be stored is only known at runtime which makes things a bit trickier. Is there a way to register these classes correctly, or Is it preferable to use different ValueState's for different types?
>
> Thanks,
> Miguel

Reply | Threaded
Open this post in threaded view
|

Re: Correctly serializing "Number" as state in ProcessFunction

Arvid Heise-4
Hi Miguel,

as Klemens said this is a rather general problem independent of Flink: How do you map Polymorphism in serialization?

Flink doesn't have an answer on its own, as it's discouraged (A Number can have arbitrary many subclasses: how do you distinguish them except by classname? That adds a ton of overhead.). The easiest solution in your case is to convert ints into double.
Or you use Kryo which dictionary encodes the classes and also limits the possible subclasses.

On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <[hidden email]> wrote:
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it quick and dirty you could implement a class such as:

public class Value {
    private boolean isLongSet = false;
    private long longValue = 0L;
    private boolean isIntegerSet = false;
    private int intValue = 0;

   public Value(final long value) {
       setLong(value);
   }

    public void setLong(final long value) |
        longValue = value;
        isLongSet = true;
   }

   public long getLong() {
       if(isLongSet) {
           return longValue
       }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as explained here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
      Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo <[hidden email]>:
>
> Hi everyone,
>
> I have a ProcessFunction which needs to store different number types for different keys, e.g., some keys need to store an integer while others need to store a double.
>
> I tried to use java.lang.Number as the type for the ValueState, but I got the expected "No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType."
>
> I have the feeling that this is not the right approach, but the exact type to be stored is only known at runtime which makes things a bit trickier. Is there a way to register these classes correctly, or Is it preferable to use different ValueState's for different types?
>
> Thanks,
> Miguel

Reply | Threaded
Open this post in threaded view
|

Re: Correctly serializing "Number" as state in ProcessFunction

Miguel Araújo-2
Thanks for your replies. I agree this is a somewhat general problem. 
I posted it here as I was trying to register the valid subclasses in Kryo but I couldn't get the message to go away, i.e., everything worked correctly but there was the complaint that GenericType serialization was being used.

This is how I was registering these types:
env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])
and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1 (1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I already had as part of my input event.

Once again, thanks for your replies because they gave me the right perspective.



Arvid Heise <[hidden email]> escreveu no dia quarta, 21/04/2021 à(s) 18:26:
Hi Miguel,

as Klemens said this is a rather general problem independent of Flink: How do you map Polymorphism in serialization?

Flink doesn't have an answer on its own, as it's discouraged (A Number can have arbitrary many subclasses: how do you distinguish them except by classname? That adds a ton of overhead.). The easiest solution in your case is to convert ints into double.
Or you use Kryo which dictionary encodes the classes and also limits the possible subclasses.

On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <[hidden email]> wrote:
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it quick and dirty you could implement a class such as:

public class Value {
    private boolean isLongSet = false;
    private long longValue = 0L;
    private boolean isIntegerSet = false;
    private int intValue = 0;

   public Value(final long value) {
       setLong(value);
   }

    public void setLong(final long value) |
        longValue = value;
        isLongSet = true;
   }

   public long getLong() {
       if(isLongSet) {
           return longValue
       }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as explained here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
      Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo <[hidden email]>:
>
> Hi everyone,
>
> I have a ProcessFunction which needs to store different number types for different keys, e.g., some keys need to store an integer while others need to store a double.
>
> I tried to use java.lang.Number as the type for the ValueState, but I got the expected "No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType."
>
> I have the feeling that this is not the right approach, but the exact type to be stored is only known at runtime which makes things a bit trickier. Is there a way to register these classes correctly, or Is it preferable to use different ValueState's for different types?
>
> Thanks,
> Miguel

Reply | Threaded
Open this post in threaded view
|

Re: Correctly serializing "Number" as state in ProcessFunction

rmetzger0
Quick comment on the kryo type registration and the messages you are seeing: The messages are expected: What the message is saying is that we are not serializing the type using Flink's POJO serializer, but we are falling back to Kryo.
Since you are registering all the instances of Number that you are using (Integer, Double), you'll get better performance (or at least less CPU load) with Kryo. So if you want to keep using Kryo, you are doing everything right (and you generally won't be able to use Flink's POJO serializer for Number-types).

On Fri, Apr 23, 2021 at 7:07 PM Miguel Araújo <[hidden email]> wrote:
Thanks for your replies. I agree this is a somewhat general problem. 
I posted it here as I was trying to register the valid subclasses in Kryo but I couldn't get the message to go away, i.e., everything worked correctly but there was the complaint that GenericType serialization was being used.

This is how I was registering these types:
env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])
and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1 (1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I already had as part of my input event.

Once again, thanks for your replies because they gave me the right perspective.



Arvid Heise <[hidden email]> escreveu no dia quarta, 21/04/2021 à(s) 18:26:
Hi Miguel,

as Klemens said this is a rather general problem independent of Flink: How do you map Polymorphism in serialization?

Flink doesn't have an answer on its own, as it's discouraged (A Number can have arbitrary many subclasses: how do you distinguish them except by classname? That adds a ton of overhead.). The easiest solution in your case is to convert ints into double.
Or you use Kryo which dictionary encodes the classes and also limits the possible subclasses.

On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <[hidden email]> wrote:
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it quick and dirty you could implement a class such as:

public class Value {
    private boolean isLongSet = false;
    private long longValue = 0L;
    private boolean isIntegerSet = false;
    private int intValue = 0;

   public Value(final long value) {
       setLong(value);
   }

    public void setLong(final long value) |
        longValue = value;
        isLongSet = true;
   }

   public long getLong() {
       if(isLongSet) {
           return longValue
       }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as explained here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
      Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo <[hidden email]>:
>
> Hi everyone,
>
> I have a ProcessFunction which needs to store different number types for different keys, e.g., some keys need to store an integer while others need to store a double.
>
> I tried to use java.lang.Number as the type for the ValueState, but I got the expected "No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType."
>
> I have the feeling that this is not the right approach, but the exact type to be stored is only known at runtime which makes things a bit trickier. Is there a way to register these classes correctly, or Is it preferable to use different ValueState's for different types?
>
> Thanks,
> Miguel