POJO with private fields and toApeendStream of StreamTableEnvironment

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

POJO with private fields and toApeendStream of StreamTableEnvironment

Sung Gon Yi
In https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset,
POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder it is possible or not officially. 
Any currently it does not work.

Codes:
—————
CsvTableSource as = CsvTableSource.builder()
.path("aa.csv")
.field("name", STRING)
.field("value", INT)
.build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—————
public class P implements Serializable {
private String name;
private Integer value;
}
—————

Above codes, I got below error message:
==========
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] of result [ArrayBuffer(String, Integer)] does not match the number[1] of requested type [GenericType<aa.P>].
at org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
at ...
==========

When fields of class P are changed to “public”, it works well.
—————
public class P implements Serializable {
public String name;
public Integer value;
}
—————

Thanks,
skonmeme



Reply | Threaded
Open this post in threaded view
|

Re: POJO with private fields and toApeendStream of StreamTableEnvironment

Timo Walther
Hi Sung,

private fields are only supported if you specify getters and setters accordingly. Otherwise you need to use `Row.class` and perform the mapping in a subsequent map() function manually via reflection.

Regards,
Timo


Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
In https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset,
POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder it is possible or not officially. 
Any currently it does not work.

Codes:
—————
CsvTableSource as = CsvTableSource.builder()
        .path("aa.csv")
        .field("name", STRING)
        .field("value", INT)
        .build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—————
public class P implements Serializable {
    private String name;
    private Integer value;
}
—————

Above codes, I got below error message:
==========
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] of result [ArrayBuffer(String, Integer)] does not match the number[1] of requested type [GenericType<aa.P>].
at org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
at ...
==========

When fields of class P are changed to “public”, it works well.
—————
public class P implements Serializable {
    public String name;
    public Integer value;
}
—————

Thanks,
skonmeme




Reply | Threaded
Open this post in threaded view
|

Re: POJO with private fields and toApeendStream of StreamTableEnvironment

Sung Gon Yi


On 29 Apr 2019, at 11:12 PM, Timo Walther <[hidden email]> wrote:

Hi Sung,

private fields are only supported if you specify getters and setters accordingly. Otherwise you need to use `Row.class` and perform the mapping in a subsequent map() function manually via reflection.

Regards,
Timo


Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
In https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset,
POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder it is possible or not officially. 
Any currently it does not work.

Codes:
—————
CsvTableSource as = CsvTableSource.builder()
        .path("aa.csv")
        .field("name", STRING)
        .field("value", INT)
        .build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—————
public class P implements Serializable {
    private String name;
    private Integer value;
}
—————

Above codes, I got below error message:
==========
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] of result [ArrayBuffer(String, Integer)] does not match the number[1] of requested type [GenericType<aa.P>].
at org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
at ...
==========

When fields of class P are changed to “public”, it works well.
—————
public class P implements Serializable {
    public String name;
    public Integer value;
}
—————

Thanks,
skonmeme





Reply | Threaded
Open this post in threaded view
|

Re: POJO with private fields and toApeendStream of StreamTableEnvironment

Sung Gon Yi
In reply to this post by Timo Walther
Sorry. I sent an empty reply.

I tried again with getter/setter. And it works. Thanks.

—————
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter @Setter
public class P implements Serializable {
private String name;
private Integer value;
}
—————



On 29 Apr 2019, at 11:12 PM, Timo Walther <[hidden email]> wrote:

Hi Sung,

private fields are only supported if you specify getters and setters accordingly. Otherwise you need to use `Row.class` and perform the mapping in a subsequent map() function manually via reflection.

Regards,
Timo


Am 29.04.19 um 15:44 schrieb Sung Gon Yi:
In https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset,
POJO data type is available to convert to DataStream.

I would like to use POJO data type class with private fields. I wonder it is possible or not officially. 
Any currently it does not work.

Codes:
—————
CsvTableSource as = CsvTableSource.builder()
        .path("aa.csv")
        .field("name", STRING)
        .field("value", INT)
        .build();
Table aa = tEnv.fromTableSource(as);
tEnv.toAppendStream(aa, P.class);
—————
public class P implements Serializable {
    private String name;
    private Integer value;
}
—————

Above codes, I got below error message:
==========
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] of result [ArrayBuffer(String, Integer)] does not match the number[1] of requested type [GenericType<aa.P>].
at org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:202)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:156)
at ...
==========

When fields of class P are changed to “public”, it works well.
—————
public class P implements Serializable {
    public String name;
    public Integer value;
}
—————

Thanks,
skonmeme