Serialization problem: Using generic that extends a class on POJO.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization problem: Using generic that extends a class on POJO.

Ido Bar Av

Hi,

 

We’re using flink 1.3.1, and we’re trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below):

We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:

 

 

java.lang.RuntimeException: Cannot instantiate class.

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)

              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)

              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)

              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)

              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)

              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

              at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey field …Foo.someKey to java.lang.Object

              at java.lang.reflect.Field.set(Field.java:764)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)

              ... 10 more

 

 

If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class.

What is the correct approach for this situation?

 

Thanks,

Ido

 

 

 

Complete code example:

public class BarKey implements Serializable {

    public List<Long> valueList;

 

    public BarKey() {

    }

 

    public BarKey(long value) {

        super();

        valueList = new ArrayList<>();

        valueList.add(value);

    }

 

    @Override

    public boolean equals(Object o) {

        if (this == o) {

            return true;

        }

        if (o == null || getClass() != o.getClass()) {

            return false;

        }

        BarKey barKey = (BarKey) o;

        return Objects.equals(valueList, barKey.valueList);

    }

 

    @Override

    public int hashCode() {

        return Objects.hash(valueList);

    }

}

 

 

public class SomeKey extends BarKey implements Serializable {

    public Integer banana=1;

 

    public SomeKey() {

    }

 

    public SomeKey(long value) {

        super(value);

    }

}

 

 

public class Foo<SomeKey extends BarKey> implements Serializable {

 

    public Foo() {}

    public SomeKey someKey;

 

    public Foo(SomeKey someKey) {

        this.someKey = someKey;

    }

 

}

 

public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable {

    public FooFoo() {

    }

 

    public Integer grill = 12;

    public FooFoo(SomeKey someKey) {

        super(someKey);

    }

 

}

class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable {

    @Override

    public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        out.collect(new FooFoo<>(new SomeKey((long) value)));

    }

}

 

 

class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable {

    @Override

    public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        value.someKey.valueList.add(1L);

        out.collect(value);

    }

}

 

class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable {

    @Override

    public BarKey getKey(Foo<SomeKey> value) throws Exception {

        return value.someKey;

    }

}

 

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {

    private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);

    public long dosomething = 0;

 

    @Override

    public void invoke(Foo<BarKey> value) throws Exception {

        dosomething += value.someKey.valueList.size();

        logger.warn("Sink {}", dosomething);

    }

}

 

 

Test code:

 

        environment.registerType(FooFoo.class); // Not certain if this is needed

 

        List<Integer> intlist = new ArrayList<>();

        intlist.add(3);

        intlist.add(5);

 

        DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);

 

        streamSource.process(new MakeFoo())

             .keyBy(new FooBarSelector<>())

             .process(new FooProcessor())

             .addSink(new FooBarSink());

 

        environment.execute(“Jobname-UT");

Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem: Using generic that extends a class on POJO.

Timo Walther
Hi Ido,

at the first glance, I could not find any problem in your code. So it might be a bug. The "environment.registerType()" is not needed in your case, because you have no generic types.

I will have a closer look at it tomorrow.

Regards,
Timo

Am 14.08.17 um 16:35 schrieb Ido Bar Av:

Hi,

 

We’re using flink 1.3.1, and we’re trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below):

We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:

 

 

java.lang.RuntimeException: Cannot instantiate class.

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)

              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)

              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)

              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)

              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)

              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

              at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey field …Foo.someKey to java.lang.Object

              at java.lang.reflect.Field.set(Field.java:764)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)

              ... 10 more

 

 

If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class.

What is the correct approach for this situation?

 

Thanks,

Ido

 

 

 

Complete code example:

public class BarKey implements Serializable {

    public List<Long> valueList;

 

    public BarKey() {

    }

 

    public BarKey(long value) {

        super();

        valueList = new ArrayList<>();

        valueList.add(value);

    }

 

    @Override

    public boolean equals(Object o) {

        if (this == o) {

            return true;

        }

        if (o == null || getClass() != o.getClass()) {

            return false;

        }

        BarKey barKey = (BarKey) o;

        return Objects.equals(valueList, barKey.valueList);

    }

 

    @Override

    public int hashCode() {

        return Objects.hash(valueList);

    }

}

 

 

public class SomeKey extends BarKey implements Serializable {

    public Integer banana=1;

 

    public SomeKey() {

    }

 

    public SomeKey(long value) {

        super(value);

    }

}

 

 

public class Foo<SomeKey extends BarKey> implements Serializable {

 

    public Foo() {}

    public SomeKey someKey;

 

    public Foo(SomeKey someKey) {

        this.someKey = someKey;

    }

 

}

 

public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable {

    public FooFoo() {

    }

 

    public Integer grill = 12;

    public FooFoo(SomeKey someKey) {

        super(someKey);

    }

 

}

class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable {

    @Override

    public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        out.collect(new FooFoo<>(new SomeKey((long) value)));

    }

}

 

 

class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable {

    @Override

    public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        value.someKey.valueList.add(1L);

        out.collect(value);

    }

}

 

class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable {

    @Override

    public BarKey getKey(Foo<SomeKey> value) throws Exception {

        return value.someKey;

    }

}

 

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {

    private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);

    public long dosomething = 0;

 

    @Override

    public void invoke(Foo<BarKey> value) throws Exception {

        dosomething += value.someKey.valueList.size();

        logger.warn("Sink {}", dosomething);

    }

}

 

 

Test code:

 

        environment.registerType(FooFoo.class); // Not certain if this is needed

 

        List<Integer> intlist = new ArrayList<>();

        intlist.add(3);

        intlist.add(5);

 

        DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);

 

        streamSource.process(new MakeFoo())

             .keyBy(new FooBarSelector<>())

             .process(new FooProcessor())

             .addSink(new FooBarSink());

 

        environment.execute(“Jobname-UT");


Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem: Using generic that extends a class on POJO.

Timo Walther
Hi Ido,

thank you for your good example to reproduce the problem. I could find a bug in Flink's type extraction logic and opened an issue for it [0]. The problem seems to be the bounded generics in both the Foo and FooFoo. Foo.someKey has the wrong type information. It is GenericType<java.lang.Object>.

As a workaround until the issue is fixed you can do the following:

@TypeInfo(Foo.TypeFactory.class)
public static class Foo<SomeKey extends BarKey> implements Serializable {

    public SomeKey someKey;

    public Foo() {}

    public Foo(SomeKey someKey) {

        this.someKey = someKey;

    }

    public static class TypeFactory extends TypeInfoFactory<Foo> {

      @Override
      public TypeInformation<Foo> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
         return new GenericTypeInfo<>(Foo.class);
      }
   }
}

I hope that helps.

Regards,
Timo


[0] https://issues.apache.org/jira/browse/FLINK-7450

Am 14.08.17 um 17:24 schrieb Timo Walther:
Hi Ido,

at the first glance, I could not find any problem in your code. So it might be a bug. The "environment.registerType()" is not needed in your case, because you have no generic types.

I will have a closer look at it tomorrow.

Regards,
Timo

Am 14.08.17 um 16:35 schrieb Ido Bar Av:

Hi,

 

We’re using flink 1.3.1, and we’re trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below):

We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:

 

 

java.lang.RuntimeException: Cannot instantiate class.

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)

              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)

              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)

              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)

              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)

              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

              at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey field …Foo.someKey to java.lang.Object

              at java.lang.reflect.Field.set(Field.java:764)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)

              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)

              ... 10 more

 

 

If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class.

What is the correct approach for this situation?

 

Thanks,

Ido

 

 

 

Complete code example:

public class BarKey implements Serializable {

    public List<Long> valueList;

 

    public BarKey() {

    }

 

    public BarKey(long value) {

        super();

        valueList = new ArrayList<>();

        valueList.add(value);

    }

 

    @Override

    public boolean equals(Object o) {

        if (this == o) {

            return true;

        }

        if (o == null || getClass() != o.getClass()) {

            return false;

        }

        BarKey barKey = (BarKey) o;

        return Objects.equals(valueList, barKey.valueList);

    }

 

    @Override

    public int hashCode() {

        return Objects.hash(valueList);

    }

}

 

 

public class SomeKey extends BarKey implements Serializable {

    public Integer banana=1;

 

    public SomeKey() {

    }

 

    public SomeKey(long value) {

        super(value);

    }

}

 

 

public class Foo<SomeKey extends BarKey> implements Serializable {

 

    public Foo() {}

    public SomeKey someKey;

 

    public Foo(SomeKey someKey) {

        this.someKey = someKey;

    }

 

}

 

public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable {

    public FooFoo() {

    }

 

    public Integer grill = 12;

    public FooFoo(SomeKey someKey) {

        super(someKey);

    }

 

}

class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable {

    @Override

    public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        out.collect(new FooFoo<>(new SomeKey((long) value)));

    }

}

 

 

class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable {

    @Override

    public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {

        value.someKey.valueList.add(1L);

        out.collect(value);

    }

}

 

class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable {

    @Override

    public BarKey getKey(Foo<SomeKey> value) throws Exception {

        return value.someKey;

    }

}

 

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {

    private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);

    public long dosomething = 0;

 

    @Override

    public void invoke(Foo<BarKey> value) throws Exception {

        dosomething += value.someKey.valueList.size();

        logger.warn("Sink {}", dosomething);

    }

}

 

 

Test code:

 

        environment.registerType(FooFoo.class); // Not certain if this is needed

 

        List<Integer> intlist = new ArrayList<>();

        intlist.add(3);

        intlist.add(5);

 

        DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);

 

        streamSource.process(new MakeFoo())

             .keyBy(new FooBarSelector<>())

             .process(new FooProcessor())

             .addSink(new FooBarSink());

 

        environment.execute(“Jobname-UT");