what's wrong with my pojo when it's used by flink ?Thanks

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

what's wrong with my pojo when it's used by flink ?Thanks

大森林

I want to do an experiment with the operator "aggregate"

My code is:

Aggregate.java

UserActionLogPOJO.java


The error I got is:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<UserActionLogPOJO>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
    at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
    at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
    at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
    at Aggregate.main(Aggregate.java:52)

Process finished with exit code 1

Could you tell me where I am wrong with UserActionLogPOJO.java?

Thanks for your help


Reply | Threaded
Open this post in threaded view
|

Re: what's wrong with my pojo when it's used by flink ?Thanks

Arvid Heise-3
Hi 大森林,

if you look in the full logs you'll see

3989 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.test.checkpointing.UserActionLogPOJO does not contain a getter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.test.checkpointing.UserActionLogPOJO does not contain a setter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.test.checkpointing.UserActionLogPOJO cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<org.apache.flink.test.checkpointing.UserActionLogPOJO>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:809)
at org.apache.flink.test.checkpointing.Aggregate.main(UnalignedCheckpointITCase.java:701)

The issue is that in your POJO your setter/getter do not match the field names. It's easiest to let your IDE generate them for you. For example, if you keep the current field names, you need to add


public String getItemId() {
return itemId;
}

public void setItemId(String itemId) {
this.itemId = itemId;
}

public void setPrice(int price) {
this.price = price;
}

public void setUserId(String userId) {
this.userId = userId;
}

public int getPrice() {
return price;
}

public String getUserId() {
return userId;
}
As you can see, none of your getters/setters is according to Java Beans specification and you need to add them all.

On Fri, Oct 2, 2020 at 4:39 PM 大森林 <[hidden email]> wrote:

I want to do an experiment with the operator "aggregate"

My code is:

Aggregate.java

UserActionLogPOJO.java


The error I got is:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<UserActionLogPOJO>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
    at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
    at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
    at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
    at Aggregate.main(Aggregate.java:52)

Process finished with exit code 1

Could you tell me where I am wrong with UserActionLogPOJO.java?

Thanks for your help




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

reply: what's wrong with my pojo when it's used by flink ?Thanks

大森林
Thanks for your help~
I have solved this problem under your guidance

Close this issue please.

MUCH THANKS


------------------ 原始邮件 ------------------
发件人: "Arvid Heise" <[hidden email]>;
发送时间: 2020年10月3日(星期六) 凌晨1:35
收件人: "大森林"<[hidden email]>;
抄送: "user"<[hidden email]>;
主题: Re: what's wrong with my pojo when it's used by flink ?Thanks

Hi 大森林,

if you look in the full logs you'll see

3989 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.test.checkpointing.UserActionLogPOJO does not contain a getter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.test.checkpointing.UserActionLogPOJO does not contain a setter for field itemId
3999 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.test.checkpointing.UserActionLogPOJO cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<org.apache.flink.test.checkpointing.UserActionLogPOJO>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:809)
at org.apache.flink.test.checkpointing.Aggregate.main(UnalignedCheckpointITCase.java:701)

The issue is that in your POJO your setter/getter do not match the field names. It's easiest to let your IDE generate them for you. For example, if you keep the current field names, you need to add


public String getItemId() {
return itemId;
}

public void setItemId(String itemId) {
this.itemId = itemId;
}

public void setPrice(int price) {
this.price = price;
}

public void setUserId(String userId) {
this.userId = userId;
}

public int getPrice() {
return price;
}

public String getUserId() {
return userId;
}
As you can see, none of your getters/setters is according to Java Beans specification and you need to add them all.

On Fri, Oct 2, 2020 at 4:39 PM 大森林 <[hidden email]> wrote:

I want to do an experiment with the operator "aggregate"

My code is:

Aggregate.java

UserActionLogPOJO.java


The error I got is:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<UserActionLogPOJO>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
    at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
    at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
    at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
    at Aggregate.main(Aggregate.java:52)

Process finished with exit code 1

Could you tell me where I am wrong with UserActionLogPOJO.java?

Thanks for your help




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng