Getting Errors when using keyby()

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting Errors when using keyby()

Sridhar Chellappa
I have a DataStream on which I am applying a CEP pattern and grouping the results using keyby(). The DataStream Object is a pojo :

public class DataStreamObject {
private String field1;
private String field2;

public DataStreamObject(String field1, String field2) {
this.field1 = field1;
this.field2 = field2;
}

public void setField1(String field1) {
this.field1 = field1;
}

public String getField1() {
return field1;
}


public void setField2(String field2) {
this.field2 = field2;
}

public String getField2() {
return field2;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DataStreamObject)) return false;

DataStreamObject that = (DataStreamObject) o;

if (!getField1().equals(that.getField1())) return false;
return getField2().equals(that.getField2());
}

@Override
public int hashCode() {
int result = getField1().hashCode();
result = 31 * result + getField2().hashCode();
return result;
}

@Override
public String toString() {
return "DriverSameAsCustomer{" +
"field1='" + field1 + '\'' +
", field2='" + field2 + '\'' +
'}';
}
}

When I submit my flinkjob, I get the following error :


This type (GenericType<com.foo.DataStreamObject>) cannot be used as key.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)
com.foo.Main.main(Main.java:66)


As I understand, I do not need to implement Key interface if the class is a POJO (which it is).

Please help me understand where I am going wrong an suggest a fix.


Reply | Threaded
Open this post in threaded view
|

Re: Getting Errors when using keyby()

Dawid Wysakowicz
Hi Sridhar,

Your class is missing default constructor(without arguments) thus it is not a valid POJO in Flink.

You can check the requirements for POJO in link here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#pojos


> On 12 Jul 2017, at 19:54, Sridhar Chellappa <[hidden email]> wrote:
>
> I have a DataStream on which I am applying a CEP pattern and grouping the results using keyby(). The DataStream Object is a pojo :
>
> public class DataStreamObject {
>     private String field1;
>     private String field2;
>
>     public DataStreamObject(String field1, String field2) {
>         this.field1 = field1;
>         this.field2 = field2;
>     }
>
>     public void setField1(String field1) {
>         this.field1 = field1;
>     }
>
>     public String getField1() {
>         return field1;
>     }
>
>
>     public void setField2(String field2) {
>         this.field2 = field2;
>     }
>
>     public String getField2() {
>         return field2;
>     }
>
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (!(o instanceof DataStreamObject)) return false;
>
>         DataStreamObject that = (DataStreamObject) o;
>
>         if (!getField1().equals(that.getField1())) return false;
>         return getField2().equals(that.getField2());
>     }
>
>     @Override
>     public int hashCode() {
>         int result = getField1().hashCode();
>         result = 31 * result + getField2().hashCode();
>         return result;
>     }
>
>     @Override
>     public String toString() {
>         return "DriverSameAsCustomer{" +
>                 "field1='" + field1 + '\'' +
>                 ", field2='" + field2 + '\'' +
>                 '}';
>     }
> }
>
> When I submit my flinkjob, I get the following error :
>
>
> This type (GenericType<com.foo.DataStreamObject>) cannot be used as key.
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)
> com.foo.Main.main(Main.java:66)
>
>
> As I understand, I do not need to implement Key interface if the class is a POJO (which it is).
>
> Please help me understand where I am going wrong an suggest a fix.
>
>


signature.asc (817 bytes) Download Attachment