Flink Pojo Serialization for Map Values

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Pojo Serialization for Map Values

KristoffSC
Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
        public long timestamp;
        public double value;
        public BadPojo badPojo = new BadPojo();

        public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

    private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map<String, BadPojo> badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
        public long timestamp;
        public double value;
        public Map<String, BadPojo> badPojo = new HashMap<>();

        public DataPoint() {}

}

My questions:
1. What actually happen here?
2. Which setrializator is used by Flink?
3. How Maps should be handled in Pojo definition to get best Serialization
performance (assuming that I do need access that map).

Thanks,
Krzysztof


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Pojo Serialization for Map Values

KristoffSC
Hi,
Any ideas about that one?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Pojo Serialization for Map Values

Theo
In reply to this post by KristoffSC

Hi Krzysztof,

Your problems arise due to Java type erasure. If you have DataPoint with Map<String, BadPojo>, all Flinks type system will see is a Map, i.e. Map<Object, Object>.

So in the first case, with DataPoint having an explicit member of type "BadPojo", Flink will deduce "DataPoint" to be a PojoType with two fields, whereas the second field "badPojo" itself is of type GenericType<BadPojo> and thus, the second field will be serialized via kryo.

In the second case, DataPoint will still be a PojoType and your "badPojo" field will also be a GenericType, but this time of type "GenericType<java.util.Map>". So no complaints about BadPOJO here because the entire map will be serialized via kryo already and flink doesn't deduce any further and doesn't see the BadPojo here. => No win for you :)  

In the second case, you need to explicitly tell flink that your "badPojo" field is a map and should be detected as "MapType" from flink. If Flink detects it as MapType, it will again complain about BadPojo itself and you are back to the roots and still need to fix the BadPojo to finally avoid kryo. :)  

I wrote myself a small utility function once when I had to tell flink about a POJO from an external library that contained a Map in order to serialize it efficiently:



    /**
     * Flink has a Types.POJO function where next to a class, one can specify a Map with fieldnames and types to be set.
     * This function is kind of a utility. Where Flink Types.POJO function creates the POJO type for the class only
     * with the fields specified, this function here creates the POJOType normally, but replaces the type of the
     * provided field with the provided type and keeps all other fields as they are generated normally.
     * @throws org.apache.flink.api.common.functions.InvalidTypesException
     */
    public static <T, V> TypeInformation<T> pojoTypeWithElementReplacement(Class<T> pojo, String replacementFieldName, TypeInformation<V> replacementFieldType) {
        final PojoTypeInfo<T> pojoType = (PojoTypeInfo<T>) Types.POJO(pojo);

        final Map<String, TypeInformation<?>> pojoFieldTypes = IntStream
                .range(0, pojoType.getArity())
                .mapToObj(fieldNr -> pojoType.getPojoFieldAt(fieldNr))
                .collect(Collectors.toMap(
                        pojoField -> pojoField.getField().getName(),
                        pojoField -> pojoField.getTypeInformation()
                ));

        final TypeInformation<?> oldTypeForElements = pojoFieldTypes.remove(replacementFieldName);
        if (oldTypeForElements == null) {
            throw new org.apache.flink.api.common.functions.InvalidTypesException("Expected " + replacementFieldName + " field to exist in order to replace it properly with custom type infos");
        }
        pojoFieldTypes.put(replacementFieldName, replacementFieldType);

        return Types.POJO(pojo, pojoFieldTypes);
    }


In your case, you could call it like this:

TypeInformation<DataPoint> pojoMapType = FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", Types.MAP(Types.STRING, TypeExtractor.createTypeInfo(BadPojo.class)));

a bit less verbose if BadPojo would really be a PojoType:

TypeInformation<DataPoint> pojoMapType = FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", Types.MAP(Types.STRING, Types.POJO(BadPojo.class)));

If the POJO is e.g. returned from a mapFunction, you can write something like stream.map(myMapFunctionReturningDataPoint).returns(pojoMapType);



Note that I wrote this for Flink 1.9. I read somewhere that Flink now has a new type system somehow, somewhere but I didn't check this out yet and have no idea what changed.

Best regards
Theo




----- Ursprüngliche Mail -----
Von: "KristoffSC" <[hidden email]>
An: "user" <[hidden email]>
Gesendet: Montag, 13. Juli 2020 23:06:20
Betreff: Flink Pojo Serialization for Map Values

Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
        public long timestamp;
        public double value;
        public BadPojo badPojo = new BadPojo();

        public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

    private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map<String, BadPojo> badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
        public long timestamp;
        public double value;
        public Map<String, BadPojo> badPojo = new HashMap<>();

        public DataPoint() {}

}

My questions:
1. What actually happen here?
2. Which setrializator is used by Flink?
3. How Maps should be handled in Pojo definition to get best Serialization
performance (assuming that I do need access that map).

Thanks,
Krzysztof


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
--
Reply | Threaded
Open this post in threaded view
|

Re: Flink Pojo Serialization for Map Values

KristoffSC
Theo,
thank you for clarification and code examples.
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Pojo Serialization for Map Values

Theo
hi Krzysztof,

That's why my goal is to always set
   env.getConfig().disableGenericTypes();
in my streaming jobs. This way, you will receive an early crash if GenericTypes are used somewhere. (They are bad for the performance so I try to avoid them everywhere).

Sadly, if you build up streaming jobs based on kafka, you must have Flink 1.10.1 or flink 1.11 to be able to have this setting set. ( https://issues.apache.org/jira/browse/FLINK-15904 ).

As my project is still on Flink 1.9, I currently have an e2e-like test with "disableGenericTypes", which differs from the real job only by not using kafka consumer and instead have another input, so I can make sure the remaining part of my pipeline has no generic types within.

Best regards
Theo



----- Ursprüngliche Mail -----
Von: "KristoffSC" <[hidden email]>
An: "user" <[hidden email]>
Gesendet: Donnerstag, 16. Juli 2020 20:57:09
Betreff: Re: Flink Pojo Serialization for Map Values

Theo,
thank you for clarification and code examples.
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.