How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Dongwon Kim-2
Hi,

The following program compiles and runs w/o exceptions:
public class Test {

  public static class A {
    private int n;

    public A() { }
    public int getN() {  return n;  }
    public void setN(int n) {  this.n = n;  }
  }

  public static class B {
    private List<A> lst;

    public B() { }
    public List<A> getLst() {  return lst;  }
    public void setLst(List<A> lst) {  this.lst = lst;  }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    env.fromElements(new B())
      .print();

    env.execute();
  }
}

When I add the following line, 
env.getConfig().disableGenericTypes();
then the program shows me an exception:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
at Test.main(Test.java:29)
 
To avoid this exception, I found that I have to declare a type factory like:
  public static class BTypeFactory extends TypeInfoFactory<B> {
    @Override
    public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      return Types.POJO(
        B.class,
        ImmutableMap.<String, TypeInformation<?>>builder()
          .put("lst", Types.LIST(Types.POJO(A.class)))
        .build()
      );
    }
  }
and give it to class B as follows:
  @TypeInfo(BTypeFactory.class)
  public static class B {
 
Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Fwd: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Dongwon Kim-2
Any advice would be appreciated :)

Thanks,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <[hidden email]>
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?
To: user <[hidden email]>


Hi,

The following program compiles and runs w/o exceptions:
public class Test {

  public static class A {
    private int n;

    public A() { }
    public int getN() {  return n;  }
    public void setN(int n) {  this.n = n;  }
  }

  public static class B {
    private List<A> lst;

    public B() { }
    public List<A> getLst() {  return lst;  }
    public void setLst(List<A> lst) {  this.lst = lst;  }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    env.fromElements(new B())
      .print();

    env.execute();
  }
}

When I add the following line, 
env.getConfig().disableGenericTypes();
then the program shows me an exception:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
at Test.main(Test.java:29)
 
To avoid this exception, I found that I have to declare a type factory like:
  public static class BTypeFactory extends TypeInfoFactory<B> {
    @Override
    public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      return Types.POJO(
        B.class,
        ImmutableMap.<String, TypeInformation<?>>builder()
          .put("lst", Types.LIST(Types.POJO(A.class)))
        .build()
      );
    }
  }
and give it to class B as follows:
  @TypeInfo(BTypeFactory.class)
  public static class B {
 
Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Arvid Heise-3
Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time. What happens if A is first final, but you later decide to subclass it? How should Flink map old data to the new hierarchy? Flink falls back to Kryo for most cases, which is why you need generic types. However, that is rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations. If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].



On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <[hidden email]> wrote:
Any advice would be appreciated :)

Thanks,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <[hidden email]>
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?
To: user <[hidden email]>


Hi,

The following program compiles and runs w/o exceptions:
public class Test {

  public static class A {
    private int n;

    public A() { }
    public int getN() {  return n;  }
    public void setN(int n) {  this.n = n;  }
  }

  public static class B {
    private List<A> lst;

    public B() { }
    public List<A> getLst() {  return lst;  }
    public void setLst(List<A> lst) {  this.lst = lst;  }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    env.fromElements(new B())
      .print();

    env.execute();
  }
}

When I add the following line, 
env.getConfig().disableGenericTypes();
then the program shows me an exception:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
at Test.main(Test.java:29)
 
To avoid this exception, I found that I have to declare a type factory like:
  public static class BTypeFactory extends TypeInfoFactory<B> {
    @Override
    public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      return Types.POJO(
        B.class,
        ImmutableMap.<String, TypeInformation<?>>builder()
          .put("lst", Types.LIST(Types.POJO(A.class)))
        .build()
      );
    }
  }
and give it to class B as follows:
  @TypeInfo(BTypeFactory.class)
  public static class B {
 
Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Dongwon Kim-2
Hi Arvid,

Thanks for the very detailed explanation and tips!

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
Do you think Scala is a better option in that regard?

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).
I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations.
I have to work with DataStream API as I need a custom trigger which is not supported in Table API AFAIK.

If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].
I used to define Avro records in .avro files and generate Java classes to use in my Flink application, but recently decided to move to POJOs for some reasons (e.g. custom Java annotations, etc).

So it seems like I'd better satisfy with BTypeFactory in my original question unless I'm willing to move to Scala or Avro, huh?

Best,

Dongwon 

On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <[hidden email]> wrote:
Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time. What happens if A is first final, but you later decide to subclass it? How should Flink map old data to the new hierarchy? Flink falls back to Kryo for most cases, which is why you need generic types. However, that is rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations. If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].



On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <[hidden email]> wrote:
Any advice would be appreciated :)

Thanks,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <[hidden email]>
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?
To: user <[hidden email]>


Hi,

The following program compiles and runs w/o exceptions:
public class Test {

  public static class A {
    private int n;

    public A() { }
    public int getN() {  return n;  }
    public void setN(int n) {  this.n = n;  }
  }

  public static class B {
    private List<A> lst;

    public B() { }
    public List<A> getLst() {  return lst;  }
    public void setLst(List<A> lst) {  this.lst = lst;  }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    env.fromElements(new B())
      .print();

    env.execute();
  }
}

When I add the following line, 
env.getConfig().disableGenericTypes();
then the program shows me an exception:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
at Test.main(Test.java:29)
 
To avoid this exception, I found that I have to declare a type factory like:
  public static class BTypeFactory extends TypeInfoFactory<B> {
    @Override
    public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      return Types.POJO(
        B.class,
        ImmutableMap.<String, TypeInformation<?>>builder()
          .put("lst", Types.LIST(Types.POJO(A.class)))
        .build()
      );
    }
  }
and give it to class B as follows:
  @TypeInfo(BTypeFactory.class)
  public static class B {
 
Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Arvid Heise-3
Do you think Scala is a better option in that regard?
I'm not sure if scala is better in this regard. Sure you could use sealed classes but I don't know if the schema inference is using it. Maybe [hidden email] knows more?

I used to define Avro records in .avro files and generate Java classes to use in my Flink application, but recently decided to move to POJOs for some reasons (e.g. custom Java annotations, etc).
If it's just about annotations: You can use custom Java annotations with Avro generated classes by setting "javaAnnotations" property. [1] You can use it on records and fields.
You can even provide your own velocity template [2] to add more features to the generation.


On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <[hidden email]> wrote:
Hi Arvid,

Thanks for the very detailed explanation and tips!

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
Do you think Scala is a better option in that regard?

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).
I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations.
I have to work with DataStream API as I need a custom trigger which is not supported in Table API AFAIK.

If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].
I used to define Avro records in .avro files and generate Java classes to use in my Flink application, but recently decided to move to POJOs for some reasons (e.g. custom Java annotations, etc).

So it seems like I'd better satisfy with BTypeFactory in my original question unless I'm willing to move to Scala or Avro, huh?

Best,

Dongwon 

On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <[hidden email]> wrote:
Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time. What happens if A is first final, but you later decide to subclass it? How should Flink map old data to the new hierarchy? Flink falls back to Kryo for most cases, which is why you need generic types. However, that is rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations. If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].



On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <[hidden email]> wrote:
Any advice would be appreciated :)

Thanks,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <[hidden email]>
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?
To: user <[hidden email]>


Hi,

The following program compiles and runs w/o exceptions:
public class Test {

  public static class A {
    private int n;

    public A() { }
    public int getN() {  return n;  }
    public void setN(int n) {  this.n = n;  }
  }

  public static class B {
    private List<A> lst;

    public B() { }
    public List<A> getLst() {  return lst;  }
    public void setLst(List<A> lst) {  this.lst = lst;  }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    env.fromElements(new B())
      .print();

    env.execute();
  }
}

When I add the following line, 
env.getConfig().disableGenericTypes();
then the program shows me an exception:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
at Test.main(Test.java:29)
 
To avoid this exception, I found that I have to declare a type factory like:
  public static class BTypeFactory extends TypeInfoFactory<B> {
    @Override
    public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      return Types.POJO(
        B.class,
        ImmutableMap.<String, TypeInformation<?>>builder()
          .put("lst", Types.LIST(Types.POJO(A.class)))
        .build()
      );
    }
  }
and give it to class B as follows:
  @TypeInfo(BTypeFactory.class)
  public static class B {
 
Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Dongwon Kim-2
Hi Arvid,

Okay, I'd better get back to Avro as you suggested!
Thanks for the tips regarding Avro.

Best,

Dongwon

On Wed, Jan 13, 2021 at 3:28 AM Arvid Heise <[hidden email]> wrote:
Do you think Scala is a better option in that regard?
I'm not sure if scala is better in this regard. Sure you could use sealed classes but I don't know if the schema inference is using it. Maybe [hidden email] knows more?

I used to define Avro records in .avro files and generate Java classes to use in my Flink application, but recently decided to move to POJOs for some reasons (e.g. custom Java annotations, etc).
If it's just about annotations: You can use custom Java annotations with Avro generated classes by setting "javaAnnotations" property. [1] You can use it on records and fields.
You can even provide your own velocity template [2] to add more features to the generation.


On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <[hidden email]> wrote:
Hi Arvid,

Thanks for the very detailed explanation and tips!

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
Do you think Scala is a better option in that regard?

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).
I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations.
I have to work with DataStream API as I need a custom trigger which is not supported in Table API AFAIK.

If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].
I used to define Avro records in .avro files and generate Java classes to use in my Flink application, but recently decided to move to POJOs for some reasons (e.g. custom Java annotations, etc).

So it seems like I'd better satisfy with BTypeFactory in my original question unless I'm willing to move to Scala or Avro, huh?

Best,

Dongwon 

On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <[hidden email]> wrote:
Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time. What happens if A is first final, but you later decide to subclass it? How should Flink map old data to the new hierarchy? Flink falls back to Kryo for most cases, which is why you need generic types. However, that is rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations. If Table API is not sufficient, I fall back to Avro and use Avro specific records [3].



On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <[hidden email]> wrote:
Any advice would be appreciated :)

Thanks,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <[hidden email]>
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?
To: user <[hidden email]>


Hi,

The following program compiles and runs w/o exceptions:
public class Test {

  public static class A {
    private int n;

    public A() { }
    public int getN() {  return n;  }
    public void setN(int n) {  this.n = n;  }
  }

  public static class B {
    private List<A> lst;

    public B() { }
    public List<A> getLst() {  return lst;  }
    public void setLst(List<A> lst) {  this.lst = lst;  }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    env.fromElements(new B())
      .print();

    env.execute();
  }
}

When I add the following line, 
env.getConfig().disableGenericTypes();
then the program shows me an exception:
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
at Test.main(Test.java:29)
 
To avoid this exception, I found that I have to declare a type factory like:
  public static class BTypeFactory extends TypeInfoFactory<B> {
    @Override
    public TypeInformation<B> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      return Types.POJO(
        B.class,
        ImmutableMap.<String, TypeInformation<?>>builder()
          .put("lst", Types.LIST(Types.POJO(A.class)))
        .build()
      );
    }
  }
and give it to class B as follows:
  @TypeInfo(BTypeFactory.class)
  public static class B {
 
Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng